이것은 딥러닝 모델으로 합성곱 신경망 구조를 활용해 텍스트 유사도를 측정한다.
기본적인 구조는 이전의 합성곱 모델과 유사하지만 이번은 데이터가 두 개의 텍스트 문장으로 되어 있기에 병렬적 구조를 가진 모델을 만들어야한다고 한다.
CNN은 문장에 대한 의미 벡터를 합성곱 신경망을 통해 추출해서 그 벡터에 대한 유사도 측정 분석 모델
예를 들어 모델에 입력하고자 하는 데이터는 문장 2개로, 문장에 대한 유사도를 보기 위해 기준이 되는 문장을 '기준 문장' 그리고 이에 대해 비교해야하는 문장을 '대상 문장'이라 한다.
책에 따르면 이런 구조를 가지고 있는데 기준 문장인 I Love deep NLP와 대상 문장인 Deep NLP is awesome은 의미가 굉장히 유사하므로 유사도 점수도 높게 나올 것이다.
모델에 데이터를 입력하기 전에 문장에 대한 인덱싱을 하고 문자열 형태의 문장을 인덱스 벡터 형태로 구성한다. 인덱스 벡터로 구성된 문장 정보는 임베딩 과정을 통해 각 단어들이 임베딩 벡터로 바뀐 행령로 구성됨.
임베딩 과정으로 나온 문장 행렬은 기준 문장, 대상 문장 각각에 해당하는 CNN 블록을 거치게 하고 CNN 블록은 합성곱 층과 맥스 풀링층을 합친 하나의 신경망을 말한다. 두 블록을 거쳐 나온 벡터는 문장에 대한 의미 벡터가 됨.( 두 문장에 대한 의미 벡터를 가지고 여러 방식으로 유사도를 구할 수 있다고 함.)
import tensorflow.compat.v1 as tf
import numpy as np
import os
from sklearn.model_selection import train_test_split
import json
DATA_IN_PATH='./data_in2/'
DATA_OUT_PATH='./data_out2/'
TRAIN_Q1_DATA_FILE='train_q1.npy'
TRAIN_Q2_DATA_FILE='train_q2.npy'
TRAIN_LABEL_DATA_FILE='train_label.npy'
DATA_CONFIGS='data_configs.json'
TEST_SPLIT = 0.1
RNG_SEED = 13371447
EPOCH=1
BATCH_SIZE=1024
MAX_SEQUENCE_LENGTH = 31
WORD_EMBEDDING_DIM = 100
CONV_FEATURE_DIM = 300
CONV_OUTPUT_DIM = 128
CONV_WINDOW_SIZE = 3
SIMILARITY_DENSE_FEATURE_DIM = 200
여기서 q1이 기준 문장이 되고, q2가 기준 문장과 비교할 대상 문장이 된다. 이제 파일들을 불러온다.
q1_data=np.load(open(DATA_IN_PATH+TRAIN_Q1_DATA_FILE,'rb'))
q2_data=np.load(open(DATA_IN_PATH+TRAIN_Q2_DATA_FILE,'rb'))
labels=np.load(open(DATA_IN_PATH+TRAIN_LABEL_DATA_FILE,'rb'))
prepro_configs=None
with open(DATA_IN_PATH+DATA_CONFIGS,'r') as f:
prepro_configs=json.load(f)
VOCAB_SIZE = prepro_configs['vocab_size']
이제 각 데이터를 학습셋과 검증셋으로 나눈다.
X=np.stack((q1_data, q2_data), axis=1)
y=labels
train_X, eval_X, train_y, eval_y=train_test_split(X,y,test_size=TEST_SPLIT, random_state=RNG_SEED)
train_Q1=train_X[:,0]
train_Q2=train_X[:,1]
eval_Q1=eval_X[:,0]
eval_Q2=eval_X[:,1]
train_test_split은 입력 데이터 X와 라벨 y로 구성된 각 데이터 배열에 대해서만 학습셋과 평가셋으로 나누게 되어 있다. 그래서 여기는 np.stack으로 q1_data, q2_data 두 데이터를 합친 후 사용.
이렇게 학습, 검증으로 나눴으면 이제 다시 두 질문을 나눠야 한다. 학습 데이터와 검증 데이터 각각에 대해 두 개의 질문 데이터로 나눠서 만든다.
이제 에스티메이터에 활용할 데이터 입력 함수를 만든다. map 함수, 학습 입력 함수, 검증 입력 함수다.
def rearrange(base, hypothesis, label):
features = {"x1": base, "x2": hypothesis}
return features, label
def train_input_fn():
dataset = tf.data.Dataset.from_tensor_slices((train_Q1, train_Q2, train_y))
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.map(rearrange)
dataset = dataset.repeat(EPOCH)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def eval_input_fn():
dataset = tf.data.Dataset.from_tensor_slices((eval_Q1, eval_Q2, eval_y))
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.map(rearrange)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
rearrange 함수는 map 함수로, 세 개의 인자가 들어오는데 각각 기준 질문, 대상 질문, 라벨값이다.
들어오면 두 개의 질문을 하나의 딕셔너리 형태의 입력값으로 만들고 만든 딕셔너리와 라벨을 리턴하는 구조다. 이 함수를 학습 입력 함수와 검증 입력 함수에 적용한다.
그리고 나머지 두 개의 입력함수는 각각 학습과 검증을 위한 입력함수로 다른 점은 학습은 정의한 에폭 수 만큼 반복하지만, 평가 함수의 경우 반복할 필요가 없다는 것이다.
모델 함수 구현 전에 CNN 블록 함수를 먼저 정의한다. CNN 블록 함수는 합성곱 신경망과 풀링, Dense를 하나로 합친 형태로 정의할 것. 세 개의 기법을 모델에서 각각 정의하는 것이 아니라 이 함수를 사용해 한 번에 사용할 수 있게 하기 위함이다.
def basic_conv_sementic_network(inputs, name):
conv_layer = tf.keras.layers.Conv1D(CONV_FEATURE_DIM,
CONV_WINDOW_SIZE,
activation=tf.nn.relu,
name=name + 'conv_1d',
padding='same')(inputs)
max_pool_layer = tf.keras.layers.MaxPool1D(MAX_SEQUENCE_LENGTH,
1)(conv_layer)
output_layer = tf.keras.layers.Dense(CONV_OUTPUT_DIM,
activation=tf.nn.relu,
name=name + 'dense')(max_pool_layer)
output_layer = tf.squeeze(output_layer, 1)
return output_layer
이 함수는 2인자를 받는데 각 입력값과 이름이다. 이전에 봤던 CNN 모델을 사용한다.
이제 모델 함수를 보면.
def model_fn(features, labels, mode):
TRAIN = mode == tf.estimator.ModeKeys.TRAIN
EVAL = mode == tf.estimator.ModeKeys.EVAL
PREDICT = mode == tf.estimator.ModeKeys.PREDICT
embedding = tf.keras.layers.Embedding(VOCAB_SIZE,
WORD_EMBEDDING_DIM)
base_embedded_matrix = embedding(features['x1'])
hypothesis_embedded_matrix = embedding(features['x2'])
base_embedded_matrix = tf.keras.layers.Dropout(0.2)(base_embedded_matrix)
hypothesis_embedded_matrix = tf.keras.layers.Dropout(0.2)(hypothesis_embedded_matrix)
base_sementic_matrix = basic_conv_sementic_network(base_embedded_matrix, 'base')
hypothesis_sementic_matrix = basic_conv_sementic_network(hypothesis_embedded_matrix, 'hypothesis')
merged_matrix = tf.concat([base_sementic_matrix, hypothesis_sementic_matrix], -1)
similarity_dense_layer = tf.keras.layers.Dense(SIMILARITY_DENSE_FEATURE_DIM,
activation=tf.nn.relu)(merged_matrix)
similarity_dense_layer = tf.keras.layers.Dropout(0.2)(similarity_dense_layer)
logit_layer = tf.keras.layers.Dense(1)(similarity_dense_layer)
logit_layer = tf.squeeze(logit_layer, 1)
similarity = tf.nn.sigmoid(logit_layer)
if PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions={
'is_duplicate':similarity
})
loss = tf.losses.sigmoid_cross_entropy(labels, logit_layer)
if EVAL:
accuracy = tf.metrics.accuracy(labels, tf.round(similarity))
return tf.estimator.EstimatorSpec(
mode=mode,
eval_metric_ops= {'acc': accuracy},
loss=loss)
if TRAIN:
global_step = tf.train.get_global_step()
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss, global_step)
return tf.estimator.EstimatorSpec(
mode=mode,
train_op=train_op,
loss=loss)
부분적으로 확인해 본다.
def model_fn(features, labels, mode):
TRAIN = mode == tf.estimator.ModeKeys.TRAIN
EVAL = mode == tf.estimator.ModeKeys.EVAL
PREDICT = mode == tf.estimator.ModeKeys.PREDICT
3개의 인자를 받는데 입력값, 라벨, 모델 함수가 사용된 모드이다. 그리고 받은 모드의 상태를 변수로 할당한다.
embedding = tf.keras.layers.Embedding(VOCAB_SIZE,WORD_EMBEDDING_DIM)
base_embedded_matrix = embedding(features['x1'])
hypothesis_embedded_matrix = embedding(features['x2'])
입력값을 임베딩 처리하고, 임베딩 객체를 생성한 후 해당 객체를 사용해 기준 문장과 대상 문장을 임베딩 벡터로 만든다. 이제 임베딩된 두 값을 정의한 CNN 블록 함수에 적용한다.
base_sementic_matrix = basic_conv_sementic_network(base_embedded_matrix, 'base')
hypothesis_sementic_matrix = basic_conv_sementic_network(hypothesis_embedded_matrix, 'hypothesis')
임베딩 층을 거친 기준, 대상 문장 메트릭스는 CNN블록을 거치게 하고, 이 블록은 위에 임베딩 층과는 별개로 각 문장에 대한 CNN 블록을 만들어 줌. 여기서 기준 문장이라는 의미의 'base', 대상 문장이라는 의미의 'hypothesis'를 이름으로 인자를 입력했다.
merged_matrix = tf.concat([base_sementic_matrix, hypothesis_sementic_matrix], -1)
similarity_dense_layer = tf.keras.layers.Dense(SIMILARITY_DENSE_FEATURE_DIM,
activation=tf.nn.relu)(merged_matrix)
similarity_dense_layer = tf.keras.layers.Dropout(0.2)(similarity_dense_layer)
logit_layer = tf.keras.layers.Dense(1)(similarity_dense_layer)
logit_layer = tf.squeeze(logit_layer, 1)
similarity = tf.nn.sigmoid(logit_layer)
CNN 블록을 통해 하나의 벡터로 만들어진 두 질문에 대해 유사도를 측정해야 한다. 유사도 측정을 위해 두 벡터에 대한 코사인 유사도 점수나 유클리디안 거리 점수를 활용할 수 있다. 여기서 Dense층에 이런 유사도 점수를 연산할 역할을 준다.
우선 concat함수로 두 질문을 하나의 벡터로 만들고 두 개의 Dense층을 거친다. 첫 번째 Dense층을 통해 나온 값에 드롭아웃을 적용하고 마지막 Dense층을 적용한다. 두 Dense 층의 차원 수는 첫 번째의 경우 사전에 정의한 차원을 적용하고 마지막 Dense층은 차원을 1로 설정. 이렇게 하면 두 문장에 대해 측정한 유사도 값이 마지막 값으로 나온다.
이 라벨과 비교해서 학습하기 위해 squeeze 함수를 적용한다. 그리고 마지막으로 예측값을 만들기 위해 시그모이드 함수로 0~1사이의 값을 만든다.
여기까지가 모델 연산 부분이고 이제 모드에 맞춰 리턴한다. 예측 모드일 때는 예측값만 전달하면 되므로
if PREDICT:
return tf.estimator.EstimatorSpec(
mode=mode,
predictions={
'is_duplicate':similarity
})
예측값을 측정한 유사도 값을 딕셔너리로 전달하고 있다. 이제 검증과 학습 상태의 경우인데, 검증과 학습의 경우 손실값을 측정해야 한다. 그리고 검증 상태는 계산한 손실값을 전달하고 학습 상태에서는 이 앖으로 학습을 진행한다. 우선 손실값을 계산하고 검증 상태인 경우에 대해 함수를 리턴한다.
loss = tf.losses.sigmoid_cross_entropy(labels, logit_layer)
if EVAL:
accuracy = tf.metrics.accuracy(labels, tf.round(similarity))
return tf.estimator.EstimatorSpec(
mode=mode,
eval_metric_ops= {'acc': accuracy},
loss=loss)
우선 손실값을 계산하고, 여기서는 시그모이드를 적용하기 전인 logits 값과 라벨로 손실일 계산한다. 그리고 검증 모드인 경우 함수 리턴 부분을 정의하는데, 정확도 값을 리턴하기 위해 값을 계산함.
마지막으로 학습 상태인 경우 단순히 손실값과 정확도를 측정하는 것보다 모델을 학습시켜서 가중치를 최적화 하기에 tf.train.AdamOptimizer생성 후 손실값 적용.
if TRAIN:
global_step = tf.train.get_global_step()
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss, global_step)
return tf.estimator.EstimatorSpec(
mode=mode,
train_op=train_op,
loss=loss)
우선 몇 번 학습이 진행됐는지 확인하기 위해 get_global_step함수로 스텝값을 받고, 이 값과 손실값을 AdamOptimizer에 넣어 학습시킨다. 그리고 이 값과 손실값을 리턴하여 끝낸다.
이제 모델 준비가 끝났고 실제 학습을 위해 Estimator 객체를 만든다.
os.environ["CUDA_VISIBLE_DEVICES"]="6" #For TEST
model_dir = os.path.join(os.getcwd(), DATA_OUT_PATH + "checkpoint/cnn/")
os.makedirs(model_dir, exist_ok=True)
est = tf.estimator.Estimator(model_fn, model_dir=model_dir)
Estimator 객체를 생성할 때 해당 경로를 설정하고 Estimator 객체의 train을 호출하면 된다.
est.train(train_input_fn) #train
학습을 몇 번 진행할 지 에폭수를 설정했고, train함수를 호출한다. 이 함수를 호출할 때 데이터를 모델에 입력하기 위해 데이터 입력함수를 인자로 전달한다. 학습이 끝나면 최종 손실값을 확인할 수 있다.
학습이 끝났다면 검증 데이터로 모델 검증을 한다.
est.evaluate(eval_input_fn) #eval
위를 입력하면 손실값과 정확도를 알 수 있다. 이제 평가데이터에 대해 모델 적용 후 결과를 캐글에 제출한다.
TEST_Q1_DATA_FILE = 'test_q1.npy'
TEST_Q2_DATA_FILE = 'test_q2.npy'
TEST_ID_DATA_FILE = 'test_id.npy'
test_q1_data = np.load(open(DATA_IN_PATH + TEST_Q1_DATA_FILE, 'rb'),allow_pickle=True)
test_q2_data = np.load(open(DATA_IN_PATH + TEST_Q2_DATA_FILE, 'rb'),allow_pickle=True)
test_id_data = np.load(open(DATA_IN_PATH + TEST_ID_DATA_FILE, 'rb'),allow_pickle=True)
평가 데이터를 불러왓으면 Estimator 객체의 predict함수로 예측 결과를 만든다. 우선 그러기 위해서 입력 함수가 필요한데, 다음과 같이 입력함수를 만든다.
predict_input_fn = tf.estimator.inputs.numpy_input_fn(x={"x1":test_q1_data,"x2":test_q2_data},shuffle=False)
학습 입력 함수나 검증 함수처럼 별도 함수를 안쓰고 estimator의 numpy_input_fb함수를 사용하고 입력 형태는 앞서 다른 입력 함수와 마찬가지로 두 질문을 딕셔너리 형태로 만들었다. 그리고 shuffle을 False로 설정하는 이유는 두 개의 질문쌍이 같은 순서로 입력돼야 하기 때문이다. 이제 predict을 실행한다.
predictions = np.array([p['is_duplicate'] for p in est.predict(input_fn=predict_input_fn)])
이렇게 평가 입력 데이터를 구현했으면 이제 모델을 통해 예측할 수 있다. 이때 받고자 하는 예측값에 대해서는 is_duplicate라는 키로 정의했기에 위와 같이 입력해서 유사도 예측값을 받는다.
이제 캐글에 제출한다.
import pandas as pd
output = pd.DataFrame( data={"test_id":test_id_data, "is_duplicate": list(predictions)} )
output.to_csv(DATA_OUT_PATH+"cnn_predict.csv", index=False, quoting=3)
XG 부스트 보다는 성능이 높지만 순위는 낮다 이는 아주 간단한 형태로만 구현했기 때문이고 합성곱을 깊게 하거나 부가적인 기법을 적용하면 더 올라간다고 한다.
[ 출처 : 책 ( 텐서플로와 머신러닝으로 시작하는 자연어 처리) ]
[ NLP ] 텍스트 유사도 (3) (2) | 2021.03.09 |
---|---|
[ NLP ] 텍스트 유사도 (2) (0) | 2021.02.24 |
[ NLP ] 텍스트 유사도 (1) (1) | 2021.01.30 |
[ NLP ] 한글 텍스트 분류 (3) (0) | 2021.01.21 |
[ NLP ] 한글 텍스트 분류 (2) (0) | 2021.01.13 |