seq2seq를 이용한 번역기 만들기
데이터 불러오기
우선 번역기를 만드려면 두개의 언어가 병렬적으로 구성된 데이터가 필요하다. 오늘 사용할 데이터는 ‘프랑스어-영어’ 로 구성된 데이터를 사용하겠다.
데이터는 ( 에서 받을 수 있다.
압축을 풀면 파일이 2개 나오는데 fra.txt
파일만 사용한다.
import tensorflow as tf
from tensorflow.keras.layers import Embedding, GRU, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
import unicodedata
import numpy as np
import pandas as pd
import re
2023-02-26 15:33:32.367543: I tensorflow/stream_executor/platform/default/] Successfully opened dynamic library
파일 구성은 source인 영어문장과 target인 프랑스문장으로 이루어져 있으며, lic는 license(?)를 뜻하는 것 같다. 여기서는 source와 target 데이터만 사용한다.
lines = pd.read_csv('./fra.txt', names=['src', 'tar', 'lic'], sep='\t')
print('전체 샘플의 개수 :',len(lines))
전체 샘플의 개수 : 208906
src | tar | lic | |
131326 | It fell short of my expectation. | Ce ne fut pas à la mesure de mes attentes. | CC-BY 2.0 (France) Attribution: #4... |
137695 | I've been feeling bad for a week. | Je me suis senti mal pendant une semaine. | CC-BY 2.0 (France) Attribution: #4... |
46110 | More money is needed. | Il faut plus d'argent. | CC-BY 2.0 (France) Attribution: #8... |
41300 | What's your problem? | C'est quoi ton problème ? | CC-BY 2.0 (France) Attribution: #3... |
169231 | This page was intentionally left blank. | Cette page a été laissée vide intentionnellement. | CC-BY 2.0 (France) Attribution: #7... |
데이터 전처리
def preprocess_sentence(sentence):
# 악센트 제거
s = ''.join(c for c in unicodedata.normalize('NFD', sentence.lower()) if unicodedata.category(c) != "Mn")
# 단어와 구두점 사이에 공백추가
s = re.sub(r"([?.!,¿])", r" \1", s)
# a-z, A-Z, . ! ? , 을 제외하고 전부 공백으로 변환
s = re.sub(r"[^a-zA-Z!.?,']+", r" ", s)
# 다수의 공백을 하나의 공백으로 변환
s = re.sub(r"\s+", " ", s)
return s
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?"
print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장 :', fr_sent)
print('전처리 후 프랑스어 문장 :', preprocess_sentence(fr_sent))
전처리 전 영어 문장 : Have you had dinner?
전처리 후 영어 문장 : have you had dinner ?
전처리 전 프랑스어 문장 : Avez-vous déjà diné?
전처리 후 프랑스어 문장 : avez vous deja dine ?
전체 데이터 중에서 100000개만 사용하겠다. 또한 교사 강요법을 사용할 예정이므로, 훈련시 사용할 디코더의 입력 시퀀스에는 sos토큰을 추가하고 출력 시퀀스에는 eos토큰을 추가한다.
num_samples = 100000
def load_preprocessed_data():
encoder_input, decoder_input, decoder_target = [], [], []
with open("./fra.txt", "r") as lines:
for i, line in enumerate(lines):
# print(f"원문: {line}")
src_line, tar_line, _ = line.strip().split('\t') # 원본문장과 번역문장 데이터로 분리
# print(f"원본문장: {src_line}")
# print(f"번역문장: {tar_line}")
src_line = [w for w in preprocess_sentence(src_line).split()] # 인코더의 입력으로 들어갈 원본문장 전처리
# print(f"인코더입력: {src_line}")
tar_line = preprocess_sentence(tar_line) # 디코더에 사용될 번역문장 전처리
tar_line_in = [w for w in ("<sos> " + tar_line).split()] # 디코더의 입력으로 들어갈 문장에 <sos>토큰을 포함
# print(f"디코더입력: {tar_line_in}")
tar_line_out = [w for w in (tar_line + " <eos>").split()] # 디코더의 정답으로 쓰일 문장에 <eos>토큰을 포함
# print(f"디코더레이블: {tar_line_out}")
if i == num_samples - 1:
return encoder_input, decoder_input, decoder_target
원문: Go. Va ! CC-BY 2.0 (France) Attribution: #2877272 (CM) & #1158250 (Wittydev)
원본문장: Go.
번역문장: Va !
인코더입력: ['go', '.']
디코더입력: ['<sos>', 'va', '!']
디코더레이블: ['va', '!', '<eos>']
각각 5개씩 샘플을 출력해보자.
en_in, de_in, de_tar = load_preprocessed_data()
print('인코더의 입력 :',en_in[:5])
print('디코더의 입력 :',de_in[:5])
print('디코더의 레이블 :',de_tar[:5])
인코더의 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]
인코더에 들어갈 데이터를 토큰화 한다.
en_tokenizer = Tokenizer(filters="", lower=False)
# 인코더 단어 집합을 정수 인코딩
# 텍스트 안의 단어를 정수 인코딩으로 매핑
encoder_input = en_tokenizer.texts_to_sequences(en_in)
# 패딩 설정
encoder_input = pad_sequences(encoder_input, padding='post')
디코더에 들어갈 데이터를 토큰화 한다. 디코더에는 input과 output 두개가 있다.
훈련 과정에서는 이전 시점의 디코더 셀을 현재 시점의 디코더 셀의 입력으로 넣지 않고 실제값을 넣는 방법을 사용한다. 그 이유는 만약 이전 시점의 디코더 셀의 예측이 틀렸는데 틀린 값을 다음으로 사용하면 연쇄적으로 틀릴 가능성이 있기 때문이다.
# 디코더 토크나이저
de_tokenizer = Tokenizer(filters="", lower=False)
# 디코더입력 단어 집합을 정수 인코딩
# 디코더출력 단어 집합을 정수 인코딩
# 디코더 입력 단어를 정수 인코딩으로 매핑
decoder_input = de_tokenizer.texts_to_sequences(de_in)
# 패딩 설정
decoder_input = pad_sequences(decoder_input, padding="post")
# 디코더 출력 단어를 정수 인코딩으로 매핑
decoder_target = de_tokenizer.texts_to_sequences(de_tar)
# 패딩 설정
decoder_target = pad_sequences(decoder_target, padding="post")
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)
인코더의 입력의 크기(shape) : (100000, 10)
디코더의 입력의 크기(shape) : (100000, 17)
디코더의 레이블의 크기(shape) : (100000, 17)
src_vocab_size = len(en_tokenizer.word_index) + 1
tar_vocab_size = len(de_tokenizer.word_index) + 1
print(f"영어 단어 집합의 크기 : {src_vocab_size} \n프랑스어 단어 집합의 크기 : {tar_vocab_size}")
영어 단어 집합의 크기 : 9065
프랑스어 단어 집합의 크기 : 16383
decoder_input의 맨앞 2는 sos토큰이고 decoder_target의 맨 뒤 3은 eos토큰을 나타낸다. sos토큰과 eos토큰을 뺀 나머지는 같아야 한다.
[ 2 172 14 236 674 395 15 5 0 0 0 0 0 0 0 0 0]
[172 14 236 674 395 15 5 3 0 0 0 0 0 0 0 0 0]
단어로부터 정수를 얻는 딕셔너리와 정수로부터 단어를 얻는 딕셔너리를 만든다.
이것들은 예측값과 실제값을 비교할 때 사용된다.
# 단어 -> 정수
src_to_index = en_tokenizer.word_index
tar_to_index = de_tokenizer.word_index
# 정수 -> 단어
index_to_src = en_tokenizer.index_word
index_to_tar = de_tokenizer.index_word
데이터 분리
데이터를 랜덤으로 셔플한다.
# 데이터 셔플
indices = np.arange(encoder_input.shape[0])
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]
데이터 100000개 중 30%를 테스트 데이터로 사용한다.
n_of_val = int(num_samples*0.3)
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]
encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)
훈련 source 데이터의 크기 : (70000, 10)
훈련 target 데이터의 크기 : (70000, 17)
훈련 target 레이블의 크기 : (70000, 17)
테스트 source 데이터의 크기 : (30000, 10)
테스트 target 데이터의 크기 : (30000, 17)
테스트 target 레이블의 크기 : (30000, 17)
모델 설계
functional API를 사용하여 모델을 설계한다. Sequential 모델로 만드는것 보다 자유로운 커스텀이 가능한 것이 장점이다.
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model
인코더 모델을 설계한다. 각 라인의 코드마다 설명을 해놓았다.
- 인코더 input layer 설정(None은 임의의 스칼라가 될 수 있음을 명시. 테스트 중에 입력shape을 자유롭게 선택가능)
- input layer에서 온 데이터를 64차원으로 임베딩
- Masking layer는 임베딩 벡터에서 온 값 중, 패딩 0은 연산에서 제외하는 역할을 함
- hidden units가 64인 LSTM layer 생성. 인코더의 hidden state를 디코더에 넘겨야 하기 때문에
로 설정. - 생성한 LSTM에 Masking layer에서 계산된 데이터를 넣으면 3개의 값을 리턴함.
- encoder_outputs은 필요 없으니 사용하지 않고 hidden state와 cell state를 저장. 이것이 컨텍스트 벡터이고, 디코더에 전달됨
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, 64)(encoder_inputs)
enc_masking = Masking(mask_value=0.0)(enc_emb)
encoder_lstm = LSTM(64, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking)
encoder_states = [state_h, state_c]
디코더 모델을 설계한다.
- 디코더 input layer 설정
- input layer에서 온 데이터를 64차원으로 임베딩
- Masking layer는 임베딩 벡터에서 온 값 중, 패딩 0은 연산에서 제외하는 역할을 함
- hidden units이 64인 LSTM을 생성. 상태값과 모든 시점에 대한 값이 필요하므로
로 설정 - 생성된 LSTM에 Masking layer에서 계산된 데이터를 넣고, 초기상태를 인코더의 컨텍스트 벡터로 사용
- 모든 단어집합에 대해 예측해야 하므로 tar_vocab_size만큼 Dense unit 설정. 활성화 함수는 softmax를 사용하여 모든 단어집합에 대한 확률분포를 계산함
- Dense layer에 LSTM에서 나온 output을 연결
decoder_inputs = Input(shape=(None,))
dec_emb = Embedding(tar_vocab_size, 64)(decoder_inputs)
dec_masking = Masking(mask_value=0.0)(dec_emb)
decoder_lstm = LSTM(64, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(dec_masking, initial_state=encoder_states)
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
모델의 입출력을 정의한다. 입력은 encoder_inputs
, decoder_inputs
이고, 출력은 decoder_outputs
이 된다.
모델 구조를 보면 이해가 될것이다.
중요한점은 ‘Layer (type)’에서 디코더의 lstm_1이 인코더의 lstm에서 나온 값 lstm[0][1]
, lstm[0][2]
를 받는다는 것이다.
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])
Model: "model"
Layer (type) Output Shape Param # Connected to
input_1 (InputLayer) [(None, None)] 0
input_2 (InputLayer) [(None, None)] 0
embedding (Embedding) (None, None, 64) 580160 input_1[0][0]
embedding_1 (Embedding) (None, None, 64) 1048512 input_2[0][0]
masking (Masking) (None, None, 64) 0 embedding[0][0]
masking_1 (Masking) (None, None, 64) 0 embedding_1[0][0]
lstm (LSTM) [(None, 64), (None, 33024 masking[0][0]
lstm_1 (LSTM) [(None, None, 64), ( 33024 masking_1[0][0]
dense (Dense) (None, None, 16383) 1064895 lstm_1[0][0]
Total params: 2,759,615
Trainable params: 2,759,615
Non-trainable params: 0
모델 학습
x에는 encoder_input_train
, decoder_input_train
을 넣고, y에는 target인 decoder_target_train
을 넣는다.
RTX 3090ti 기준으로 13분 정도 걸렸다.[encoder_input_train, decoder_input_train], y=decoder_target_train,
validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
batch_size=128, epochs=50)
Epoch 50/50
547/547 [==============================] - 16s 29ms/step - loss: 0.2761 - acc: 0.9302 - val_loss: 0.8073 - val_acc: 0.8627
예측을 위한 모델 재정의
seq2seq는 훈련과정과 예측과정에서의 동작 방식이 다르다. 훈련 과정에서는 decoder_input
으로 정답 데이터가 들어갔지만 예측과정에서는 sos토큰 하나만 들어가기 때문이다.
전체적인 예측과정을 정리하자면,
- 번역하고싶은 문장이 인코더로 입력되어 컨텍스트 벡터를 생성
- 인코더의 컨텍스트 벡터와 sos토큰을 디코더로 보냄
- 디코더는 eos토큰이나 정해진 길이의 문자가 나올 때 까지 단어를 예측
그래서 훈련때 사용한 모델을 조금 수정시켜야 한다.
인코더는 수정할 필요가 없으므로 훈련 때 사용했던 인코더 모델을 그대로 사용한다. 따라서 인코더의 모든 부분은 학습되어 있는 상태이다.
# 인코더
encoder_model = Model(encoder_inputs, encoder_states)
예측 단계에서 디코더는 매 시점별로 다뤄야 한다. 따라서 이전 시점을 다뤄야 하는 decoder_state_input_h
와 decoder_state_input_c
를 정의한다.
# 디코더
decoder_state_input_h = Input(shape=(64,))
decoder_state_input_c = Input(shape=(64,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
훈련 때 사용했던 임베딩 층을 재사용한다.
dec_emb2 = dec_emb
다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용한다. 훈련 때에는 디코더의 초기 상태를 컨텍스트 벡터로 했던점과 다르다.
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]
모든 시점에 대해서 단어를 예측한다. decoder_dense
는 훈련 때 학습이 되었다.
decoder_outputs2 = decoder_dense(decoder_outputs2)
마지막으로 디코더 모델을 재정의 한다.
입력은 [decoder_inputs]
과 decoder_states_inputs
값을 합친 것이다.
여기서 [decoder_inputs]
는 디코더의 입력 시퀀스이고 decoder_states_inputs
은 위에서 정의한 hidden state와 cell state를 나타내는 것이다.
출력은 [decoder_outputs2]
와 decoder_states2
을 합친 것이다.
은 lstm을 통해서 나온 결과이고 decoder_states2
는 hidden state와 cell state를 나타낸다.
decoder_model = Model(
[decoder_inputs] + decoder_states_inputs,
[decoder_outputs2] + decoder_states2)
개념이 약간 어려울 것이다. 이제 예측하는 코드를 하나하나 풀어보면서 어떻게 돌아가는지 살펴보겠다. 일단 코드는 pseudo-code이므로 작동되지 않는다. 마지막에 전체 코드를 올려두겠다!
states_value = encoder_model.predict(input_seq)
학습된 encoder_model
에 input_seq
를 넣어서 states_value
를 얻어낸다.
여기서 input_seq
는 번역을 위한 문장이다.
은 출력값으로 encoder_states
을 내보내는데, 이것은 state_h
, state_c
의 값이다.
target_seq = np.zeros((1,1))
target_seq[0, 0] = tar_to_index['<sos>']
이 코드는 디코더의 첫 입력으로 들어갈 SOS토큰을 생성하는 과정이다.
stop_condition = False
decoded_sentence = ''
은 원하는 조건이 나오면 True로 설정된다. False인 동안에는 계속 반복하여 예측을 진행한다.
는 최종적으로 반환될 예측이 완료된 문장이다.
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
여기서부터 중요하다. 우선 맨 처음 반복때, decoder_model
에 SOS토큰과 인코더에서 나온 states_value
를 넣는다.
위의 코드에서 states_value
는 decoder_model
의 decoder_states_inputs
에 해당하므로 모델을 호출했을 때, states_value
의 값은
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
으로 할당된다.
그러면 아까 재정의한 디코더 모델에서 이 decoder_states_inputs
을 initial_state
으로 사용하여 decoder_outputs2
과 [state_h2, state_c2]
을 반환한다.
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = index_to_tar[sampled_token_index]
이 부분은 디코더 모델에서 나온 output_tokens
을 단어로 변환하는 과정이다.
이 output_tokens
는 다음을 예측할 때, 디코더 모델의 [target_seq]
부분으로 들어가게 된다.
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
이런식으로 말이다.
states_value = [h, c]
이 부분은 디코더 모델에서 나온 h, c를 states_value
에 저장한다.
그리고 EOS토큰이 나올 때 까지 값을 갱신하면서 반복한다.
대충 감이 오는가? 2번째 반복부터는, SOS토큰과 인코더의 컨텍스트 벡터가 들어가는게 아니라, 예측을 하고 나온 시퀀스와 상태값들로 반복한다.
결국 맨 처음 번역할 문장이 인코더로 들어오면 컨텍스트 벡터가 생성이 되고, SOS토큰과 함께 첫 번째 예측을 한다. 그 다음부터는 결과값들로만 예측을 진행한다.
즉, 컨텍스트 벡터를 한번 넣었을 때, 그 값을 기준으로 계속 예측을 하기 때문에 컨텍스트 벡터가 얼마나 잘 학습되었는지가 중요하다.
아래에는 예측을 진행하는 총 코드이다.
def decode_sequence(input_seq):
# 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
states_value = encoder_model.predict(input_seq)
# <SOS>에 해당하는 정수 생성
target_seq = np.zeros((1,1))
target_seq[0, 0] = tar_to_index['<sos>']
stop_condition = False
decoded_sentence = ''
# stop_condition이 True가 될 때까지 루프 반복
while not stop_condition:
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# 예측 결과를 단어로 변환
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = index_to_tar[sampled_token_index]
# 현재 시점의 예측 단어를 예측 문장에 추가
decoded_sentence += ' '+sampled_char
# <eos>에 도달하거나 정해진 길이를 넘으면 중단.
if (sampled_char == '<eos>' or
len(decoded_sentence) > 50):
stop_condition = True
# 현재 시점의 예측 시퀀스를 다음 시점의 입력으로 사용하기 위해 저장
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
# 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
states_value = [h, c]
return decoded_sentence
결과 확인을 위한 함수를 만든다.
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0):
sentence = sentence + index_to_src[encoded_word] + ' '
return sentence
# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
sentence = sentence + index_to_tar[encoded_word] + ' '
return sentence
테스트 데이터에 대해서 임의의 인덱스로 결과를 출력해 보았다.
for seq_index in [3, 50, 100, 300, 1001]:
input_seq = encoder_input_test[seq_index: seq_index + 1]
decoded_sentence = decode_sequence(input_seq)
print("입력문장 :",seq_to_src(encoder_input_test[seq_index]))
print("정답문장 :",seq_to_tar(decoder_input_test[seq_index]))
print("번역문장 :",decoded_sentence[1:-5])
입력문장 : i like it .
정답문장 : j'aime bien .
번역문장 : j'aime le .
입력문장 : we did that yesterday .
정답문장 : nous avons fait ca hier .
번역문장 : nous avons fait plein de temps .
입력문장 : i figured i'd find you here .
정답문장 : je me suis imaginee que je te trouverais ici .
번역문장 : je me suis imaginee que je vous trouverais la .
입력문장 : i always believe you .
정답문장 : je vous crois toujours .
번역문장 : je vous crois toujours .
입력문장 : who cares about facts ?
정답문장 : qui se soucie des faits ?
번역문장 : qui vont tout ce ?
현재 셀 또는 이전 셀에서 코드를 실행하는 동안 Kernel이 충돌했습니다. 셀의 코드를 검토하여 오류의 가능한 원인을 식별하세요. 자세한 내용을 보려면 <a href=''> 여기 </a> 를 클릭하세요. 자세한 내용은 Jupyter <a href='command:jupyter.viewOutput'>로그</a>를 참조하세요.
어느정도 잘 예측하는 것 같다. 다음에는 attention을 이용하여 더 정교하게 만들어보자.