본 포스팅은 지금 하고 있는 연구가 Sentence_transformers 라이브러리의 모델에서 cross-attention을 사용할 수 없어서 transformers 라이브러리의 모델로 변환하는 과정을 담고 있습니다. 도움이 되셨으면 좋겠습니다. ㅎㅎ..

모델과 토크나이저 로드

q_encoder = AutoModel.from_pretrained("klue/roberta-base")
p_encoder = AutoModel.from_pretrained("klue/roberta-base")

q_encoder와 p_encoder를 선언한다.

tokenizer = AutoTokenizer.from_pretrained("klue/roberta-base")

토크나이저도 추가한다.

데이터셋 선언

sts_train_dataset = sts_processing("../data/kor_nli_sts/KorSTS/sts-train.tsv")
sts_valid_dataset = sts_processing("../data/kor_nli_sts/KorSTS/sts-dev.tsv")
sts_test_dataset = sts_processing("../data/kor_nli_sts/KorSTS/sts-test.tsv")

데이터셋을 추가한다. 데이터셋은 여기 에서 받을 수 있다.
sts_processing 함수는 그냥 NAN 값을 제거하고 score 부분을 0~1로 정규화 하여 Dataset 객체로 반환한다.

커스텀 모델 선언

model = DualEncoderModel(q_encoder, p_encoder, tokenizer)

모델 부분이다. 허깅페이스에서는 듀얼 인코더 모델이 없는 것 같아서 만들어 보았다. 모델 호출은 q_encoder와 p_encoder, tokenizer를 필요로 한다.

class HuggingfaceBaseDualEncoderModel(nn.Module):
    def __init__(self, q_encoder, p_encoder, tokenizer):
        super(HuggingfaceBaseDualEncoderModel, self).__init__()
        self.q_encoder = q_encoder
        self.p_encoder = p_encoder
        self.pooler = Pooling(word_embedding_dimension=768)
        self.tokenizer = tokenizer

    def forward(self, sen1_features=None, sen2_features=None, labels=None):
        pass

모델은 일단 torch.nn.Module을 상속받아서 Base 모델을 선언하였다. 굳이 이렇게 하지 않아도 되지만 현재 하고 있는 연구가 Base 모델 클래스를 기본으로 파생해서 만들어야 하기 때문이다.

class DualEncoderModel(HuggingfaceBaseDualEncoderModel):
    def forward(self, sen1_features=None, sen2_features=None, labels=None):
        q_embeddings = self.q_encoder(**sen1_features)
        q_embeddings = self.pooler(q_embeddings, sen1_features)
        p_embeddings = self.p_encoder(**sen2_features)
        p_embeddings = self.pooler(p_embeddings, sen2_features)

        # loss
        a = torch.nn.functional.normalize(q_embeddings["sentence_embedding"], p=2, dim=1)
        b = torch.nn.functional.normalize(p_embeddings["sentence_embedding"], p=2, dim=1)
        dot_score = (a * b).sum(dim=-1)
        scores = dot_score * 20
        scores = scores[:, None] - scores[None, :]

        # label matrix indicating which pairs are relevant
        labels = labels[:, None] < labels[None, :]
        labels = labels.float()

        # mask out irrelevant pairs so they are negligible after exp()
        scores = scores - (1 - labels) * 1e12

        # append a zero as e^0 = 1
        scores = torch.cat((torch.zeros(1).to(scores.device), scores.view(-1)), dim=0)
        # scores에서 labels가 0인 경우 log를 취했을 때 nan값이 됨. 따라서 labels가 1인 경우의 score에만 log를 취해서 전부 더함
        loss = torch.logsumexp(scores, dim=0)

        output = {"loss": loss}
        return output

코드가 좀 지저분 하긴 하지만, loss 클래스를 따로 만들지 않고 Model에 만들었다. loss는 CosENTLoss()를 사용하였다.
기본적으로 q_encoder, p_encoder 모델은 RoBERTa 모델이고 forward의 input으로 “input_ids”, “attention_mask”, “token_type_ids” 를 받는다. sen1_features, sen2_features는 딕셔너리 형태로 되어 있고, 이따 아래에서 설명할 data collator에 의해 생성된다.
pooler 부분은 sentence_transformer 라이브러리에서 지원하는 Pooling 클래스 모듈이다. sentence_transformer 라이브러리를 안쓴 이유는 추후 연구 진행에 막힘이 있을 것 같아서 transformer 라이브러리를 직접적으로 사용하였다. sentence_transformers.models.Pooling()을 오버라이딩 하여 forward 부분을 아래처럼 수정하였다.

# 기본 Pooling의 forward
def forward(self, features: dict[str, Tensor]) -> dict[str, Tensor]:
    token_embeddings = features["token_embeddings"]

    
# 수정한 forward
def forward(self, outputs: BaseModelOutputWithPoolingAndCrossAttentions, features: dict[str, Tensor]) -> dict[str, Tensor]:
    features["token_embeddings"] = outputs["last_hidden_state"]
    token_embeddings = features["token_embeddings"]

TrainingArguments 설정

training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=5,
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        learning_rate=2e-5,
        warmup_ratio=0.1,
        optim='adamw_hf',
        fp16=True,
        bf16=False,
        eval_strategy="epoch",
        eval_steps=100,
        save_strategy="epoch",
        save_steps=100,
        save_total_limit=2,
        logging_steps=10,
        weight_decay=0.1,
        lr_scheduler_type="linear",
        max_grad_norm=1,
        remove_unused_columns=False,
        **(
            {
                "load_best_model_at_end": True,
                "metric_for_best_model": "eval_loss",
                "greater_is_better": False
            } if early_stopping else {}
        )
    )

Trainer에 필요한 args를 설정한다. 사전에 early_stopping 이라는 bool 변수를 두고 True 라면 아래 인자가 추가되게 만들었다.

Trainer 설정

trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=sts_train_dataset,
        eval_dataset=sts_valid_dataset,
        data_collator=HuggingfaceDataCollator(tokenizer),
        **(
            {
                "callbacks": [EarlyStoppingCallback(early_stopping_patience=2)] if early_stopping else []
            }
        ) if early_stopping else {}
    )

원래 기본 data collator를 사용하려고 했으나 의문의(?) 오류가 발생해서 커스텀 data collator를 선언하였다.

@dataclass
class HuggingfaceDataCollator:
    tokenize_fn: Callable

    def __call__(self, features: list[dict[str, Any]]) -> dict[str, Any]:
        # Correct tokenize function usage
        sen1_features = self.tokenize_fn([feature["sentence1"] for feature in features], padding=True, truncation=True,
                                         return_tensors="pt")
        sen2_features = self.tokenize_fn([feature["sentence2"] for feature in features], padding=True, truncation=True,
                                         return_tensors="pt")
        labels = torch.tensor([feature["score"] for feature in features])

        # Return formatted inputs for the model
        return {
            "sen1_features": {key: val for key, val in sen1_features.items()},
            "sen2_features": {key: val for key, val in sen2_features.items()},
            "labels": labels,
        }

transformers 라이브러리에서 커스텀으로 collator를 설정하면 dataloader를 불러올 때 커스텀한 collator를 적용해서 데이터를 불러온다. 원래 dataloader를 오버라이딩해서 구현하려고 했지만 너무 헷갈려서 그냥 collator를 수정하였다.
여기서 features는 배치 64만큼의 리스트로 되어있고 하나의 요소는 {“score”: ~~, “sentence1”: ~~, “sentence2”: ~~} 같이 딕셔너리로 구성되어 있다.
tokenize_fn은 토크나이저 함수이며 토크나이징을 통해 BERT에 맞게 [CLS] 토큰과 [SEP] 토큰을 추가한다.
sen1_features와 sen2_features는 토크나이징을 통해 “input_ids”, “attention_mask”, “token_type_ids”가 자동으로 만들어 진다. return은 마찬가지로 dict 형태로 반환한다.

Trainer 부분이다. 학습 중간에 validation step이 이루어 지지 않아서 Trainer 클래스의 compute_loss() 메서드를 오버라이딩하여 아래와 같이 수정하였다.

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        # 모델 입력을 분리
        sen1_features = inputs.pop("sen1_features")
        # collator 에서 딕셔너리 형태로 반환
        sen2_features = inputs.pop("sen2_features")
        labels = inputs.pop("labels")

        # 모델에 forward 호출
        outputs = model(
            sen1_features=sen1_features,
            sen2_features=sen2_features,
            labels=labels
        )
        loss = outputs["loss"]  # loss 가져오기
        return (loss, outputs) if return_outputs else loss

compute_loss() 메서드에서 model은 DualEncoderModel 모델이고 inputs로는 위의 data collator를 통해서 나온 리턴값이 입력으로 들어온다.
모델을 호출할 때 model의 forward에 맞게 sen1_features, sen2_features, labels로 나누어서 들어간다. sen1_features, sen2_features는 각각 “input_ids”, “attention_mask”, “token_type_ids” 가 들어있다.

Training

trainer.train()

train()을 통해 학습을 시작한다. H100 1개 기준 5분 걸렸다.

Evaluating

이제 모델을 평가할 차례이다. 모델 평가는 sentence_transformers.evaluation.EmbeddingSimilarityEvaluator를 오버라이딩 해서 사용하였다.
이 클래스도 sentence_transformer 모델에 특화되어 있어서 그냥 transformers 라이브러리를 사용하는 나에게는 동작하지 않았다.
따라서 def __call__() 부분만 아래와 같이 수정하였다.

class HuggingfaceEmbeddingSimilarityEvaluator(EmbeddingSimilarityEvaluator):
    def __call__(self, model, output_path=None, epoch=-1, steps=-1):
        import numpy as np
        from sklearn.metrics.pairwise import paired_cosine_distances, paired_euclidean_distances, \
            paired_manhattan_distances
        from scipy.stats import pearsonr, spearmanr
        import csv
        if epoch != -1:
            if steps == -1:
                out_txt = f" after epoch {epoch}"
            else:
                out_txt = f" in epoch {epoch} after {steps} steps"
        else:
            out_txt = ""
        if self.truncate_dim is not None:
            out_txt += f" (truncated to {self.truncate_dim})"
    
        print(f"EmbeddingSimilarityEvaluator: Evaluating the model on the {self.name} dataset{out_txt}:")
    
        model.eval()
        with torch.no_grad():
            q_encoder = CustomEncoder(model=model.q_encoder, pooler=model.pooler, tokenizer=model.tokenizer)
            p_encoder = CustomEncoder(model=model.p_encoder, pooler=model.pooler, tokenizer=model.tokenizer)
            embeddings1 = q_encoder.encode(
                self.sentences1,
                batch_size=self.batch_size,
                show_progress_bar=True,
                convert_to_numpy=True,
                normalize_embeddings=False,
                precision="float32"
    
            )
            embeddings2 = p_encoder.encode(
                self.sentences2,
                batch_size=self.batch_size,
                show_progress_bar=True,
                convert_to_numpy=True,
                normalize_embeddings=False,
                precision="float32"
            )

이후 코드는 원래 클래스와 동일하다. 여기서 문제는 encoder 모델이 huggingface AutoModel 이기 때문에 encoder() 함수가 없다.
(sentence_transformers에는 있음) 따라서 sentence_transformer 라이브러리의 encoder() 함수를 참고하여 만들었다.

class CustomEncoder:
    def __init__(self, model, pooler, tokenizer):
        self.model = model
        self.pooler = pooler
        self.tokenizer = tokenizer
        self.device = self.model.device
        self.model.to(self.device)

    def encode(self,
               sentences,
               prompt_name=None,
               prompt=None,
               batch_size=32,
               show_progress_bar=True,
               output_value="sentence_embedding",
               precision="float32",
               convert_to_numpy=True,
               convert_to_tensor=False,
               device=None,
               normalize_embeddings=False,
               ):
        import numpy as np
        from tqdm.autonotebook import trange

        self.model.eval()
        input_was_string = False
        if isinstance(sentences, str) or not hasattr(
                sentences, "__len__"
        ):
            sentences = [sentences]
            input_was_string = True
        extra_features = {}
        all_embeddings = []
        length_sorted_idx = np.argsort([-self._text_length(sen) for sen in sentences])
        sentences_sorted = [sentences[idx] for idx in length_sorted_idx]
        for start_index in trange(0, len(sentences), batch_size, desc="Batches", disable=not show_progress_bar):
            sentences_batch = sentences_sorted[start_index: start_index + batch_size]
            features = self.tokenizer(
                sentences_batch,
                padding=True,
                truncation=True,
                return_tensors="pt",
            ).to(self.device)
            features = features.data
            features = self._batch_to_device(features, device)
            features.update(extra_features)

            with torch.no_grad():
                out_features = self.model(**features)
                out_features = self.pooler(out_features, features)
                out_features["sentence_embedding"] = self._truncate_embeddings(
                    out_features["sentence_embedding"], None
                )

                if output_value == "token_embeddings":
                    embeddings = []
                    for token_emb, attention in zip(out_features[output_value], out_features["attention_mask"]):
                        last_mask_id = len(attention) - 1
                        while last_mask_id > 0 and attention[last_mask_id].item() == 0:
                            last_mask_id -= 1

                        embeddings.append(token_emb[0: last_mask_id + 1])
                elif output_value is None:  # Return all outputs
                    embeddings = []
                    for sent_idx in range(len(out_features["sentence_embedding"])):
                        row = {name: out_features[name][sent_idx] for name in out_features}
                        embeddings.append(row)
                else:  # Sentence embeddings
                    embeddings = out_features[output_value]
                    embeddings = embeddings.detach()
                    if normalize_embeddings:
                        embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)

                    # fixes for #522 and #487 to avoid oom problems on gpu with large datasets
                    if convert_to_numpy:
                        embeddings = embeddings.cpu()

                all_embeddings.extend(embeddings)

        all_embeddings = [all_embeddings[idx] for idx in np.argsort(length_sorted_idx)]

        if convert_to_tensor:
            if len(all_embeddings):
                if isinstance(all_embeddings, np.ndarray):
                    all_embeddings = torch.from_numpy(all_embeddings)
                else:
                    all_embeddings = torch.stack(all_embeddings)
            else:
                all_embeddings = torch.Tensor()
        elif convert_to_numpy:
            if not isinstance(all_embeddings, np.ndarray):
                if all_embeddings and all_embeddings[0].dtype == torch.bfloat16:
                    all_embeddings = np.asarray([emb.float().numpy() for emb in all_embeddings])
                else:
                    all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings])
        elif isinstance(all_embeddings, np.ndarray):
            all_embeddings = [torch.from_numpy(embedding) for embedding in all_embeddings]

        if input_was_string:
            all_embeddings = all_embeddings[0]

        return all_embeddings

    def _truncate_embeddings(self, embeddings, truncate_dim):
        return embeddings[..., :truncate_dim]

    def _batch_to_device(self, batch: dict[str, Any], target_device) -> dict[str, Any]:
        from torch import Tensor
        for key in batch:
            if isinstance(batch[key], Tensor):
                batch[key] = batch[key].to(target_device)
        return batch

    def _text_length(self, text) -> int:
        """
        Help function to get the length for the input text. Text can be either
        a list of ints (which means a single text as input), or a tuple of list of ints
        (representing several text inputs to the model).
        """

        if isinstance(text, dict):  # {key: value} case
            return len(next(iter(text.values())))
        elif not hasattr(text, "__len__"):  # Object has no len() method
            return 1
        elif len(text) == 0 or isinstance(text[0], int):  # Empty string or list of ints
            return len(text)
        else:
            return sum([len(t) for t in text])

이 코드에서 중요한것은 tokenizer를 통해서 나온 결과 값을 모델의 forward로 들어가는 형식과 맞춘것이다. 코드를 조금 더 간결하게 짤 수 있을 것 같은데 실력의 한계다..

Result

Metric Pearson Spearman
Cosine 0.8124762323646662 0.8082641269098775
Euclidean 0.8019528231023273 0.798158350294268
Manhattan 0.8001670570954755 0.7966446194852113
Dot 0.6849669599345047 0.6659398109956867

카테고리:

업데이트:

댓글남기기