본문 바로가기

스타트업 투자/데이터 + 테크

패스트캠퍼스 환급챌린지 41일차 : 주요코드 이해하기

by Sungwook Choi 2025. 5. 11.

본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성하였습니다.

*_1. 학습 인증샷 4장 이상 포함
*_① 오늘자 날짜, 공부 시작 시각 포함 사진 1장

② 오늘자 날짜, 공부 종료 시각 포함 사진 1장

③ 1개 클립 수강 인증 사진 (강의장 목록 캡쳐, 강의 내용이 담긴 수강화면이 보이지 않도록) 1장

④ 학습 인증샷 1장 이상 (ex. 필기 촬영, 작업물, 등)

2. 학습 후기 700자 이상 (공백 제외)

순서

1. 필요한 패키지 설치

2. 데이터 전처리

3. 모델 로드 및 템플릿 적용

4. LoRA와 SFTConfig 적용

5. 학습 중 전처리 함수 : collate_fn

6. 어텐션 마스크 확인

7. 전처리 이해하기

8. 학습하기

9. 테스트 데이터 준비하기

10. 파인튜닝 모델 테스트

11. 기본 모델 테스트

  1. 필요한 패키지설치
  2. %pip install "torch==2.4.0" %pip install "transformers==4.45.1" "datasets==3.0.1" "accelerate==0.34.2" "trl==0.11.1" "peft==0.13.0" from datasets import load_dataset, Dataset import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import LoraConfig from trl import SFTConfig, SFTTrainer
  3. 데이터 전처리
  4. from datasets import load_dataset

1. 허깅페이스 허브에서 데이터셋 로드

dataset = load_dataset("iamjoon/finance_news_summarizer", split="train")

2. 전체 데이터 크기만 출력

print("전체 데이터 크기:", len(dataset))

3. train/test 분할 비율 설정 (0.5면 5:5로 분할)

test_ratio = 0.5

train_data = []
test_data = []

4. 전체 데이터의 인덱스를 train/test로 분할

data_indices = list(range(len(dataset)))
test_size = int(len(data_indices) * test_ratio)

test_data = data_indices[:test_size]
train_data = data_indices[test_size:]

5. OpenAI format으로 데이터 변환을 위한 함수

def format_data(sample):
return {
"messages": [
{
"role": "system",
"content": sample["system_prompt"],
},
{
"role": "user",
"content": sample["user_prompt"],
},
{
"role": "assistant",
"content": str(sample["assistant"])
},
],
}

6. 분할된 데이터를 OpenAI format으로 변환

train_dataset = [format_data(dataset[i]) for i in train_data]
test_dataset = [format_data(dataset[i]) for i in test_data]

7. 최종 데이터셋 크기 출력

print(f"\n전체 데이터 분할 결과: Train {len(train_dataset)}개, Test {len(test_dataset)}개")


3. 모델 로드 및 템플릿 적용
~~~모델 코드
# 허깅페이스 모델 ID
model_id = "NCSOFT/Llama-VARCO-8B-Instruct" 

# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.bfloat16,
)
tokenizer = AutoTokenizer.from_pretrained(model_id)

# 템플릿 적용
text = tokenizer.apply_chat_template(
    train_dataset[0]["messages"], tokenize=False, add_generation_prompt=False
)
print(text)
  1. LoRA와 SFTConfig 설정
    peft_config = LoraConfig(
         lora_alpha=32,
         lora_dropout=0.1,
         r=8,
         bias="none",
         target_modules=["q_proj", "v_proj"],
         task_type="CAUSAL_LM",
    )
    

args = SFTConfig(
output_dir="llama3-8b-summarizer-ko", # 저장될 디렉토리와 저장소 ID
num_train_epochs=3, # 학습할 총 에포크 수
per_device_train_batch_size=2, # GPU당 배치 크기
gradient_accumulation_steps=2, # 그래디언트 누적 스텝 수
gradient_checkpointing=True, # 메모리 절약을 위한 체크포인팅
optim="adamw_torch_fused", # 최적화기
logging_steps=10, # 로그 기록 주기
save_strategy="steps", # 저장 전략
save_steps=50, # 저장 주기
bf16=True, # bfloat16 사용
learning_rate=1e-4, # 학습률
max_grad_norm=0.3, # 그래디언트 클리핑
warmup_ratio=0.03, # 워밍업 비율
lr_scheduler_type="constant", # 고정 학습률
push_to_hub=False, # 허브 업로드 안 함
remove_unused_columns=False,
dataset_kwargs={"skip_prepare_dataset": True},
report_to=None
)


5. 학습중 전처리 함수
~~~모델 코드 3
def collate_fn(batch):
    new_batch = {
        "input_ids": [],
        "attention_mask": [],
        "labels": []
    }

    for example in batch:
        messages = example["messages"]

        # LLaMA 3 채팅 템플릿 적용 (시작 토큰 포함)
        prompt = "<|begin_of_text|>"
        for msg in messages:
            role = msg["role"]
            content = msg["content"].strip()
            prompt += f"<|start_header_id|>{role}<|end_header_id|>\n{content}<|eot_id|>"

        # 마지막 assistant 메시지는 응답으로 간주하고 레이블에 포함
        text = prompt.strip()

        # 토큰화
        tokenized = tokenizer(
            text,
            truncation=True,
            max_length=max_seq_length,
            padding=False,
            return_tensors=None,
        )

        input_ids = tokenized["input_ids"]
        attention_mask = tokenized["attention_mask"]
        labels = [-100] * len(input_ids)

        # assistant 응답의 시작 위치 찾기
        assistant_header = "<|start_header_id|>assistant<|end_header_id|>\n"
        assistant_tokens = tokenizer.encode(assistant_header, add_special_tokens=False)
        eot_token = "<|eot_id|>"
        eot_tokens = tokenizer.encode(eot_token, add_special_tokens=False)

        # 레이블 범위 지정
        i = 0
        while i <= len(input_ids) - len(assistant_tokens):
            if input_ids[i:i + len(assistant_tokens)] == assistant_tokens:
                start = i + len(assistant_tokens)
                end = start
                while end <= len(input_ids) - len(eot_tokens):
                    if input_ids[end:end + len(eot_tokens)] == eot_tokens:
                        break
                    end += 1
                for j in range(start, end):
                    labels[j] = input_ids[j]
                for j in range(end, end + len(eot_tokens)):
                    labels[j] = input_ids[j]  # <|eot_id|> 토큰도 포함
                break
            i += 1

        new_batch["input_ids"].append(input_ids)
        new_batch["attention_mask"].append(attention_mask)
        new_batch["labels"].append(labels)

    # 패딩 처리
    max_length = max(len(ids) for ids in new_batch["input_ids"])
    for i in range(len(new_batch["input_ids"])):
        pad_len = max_length - len(new_batch["input_ids"][i])
        new_batch["input_ids"][i].extend([tokenizer.pad_token_id] * pad_len)
        new_batch["attention_mask"][i].extend([0] * pad_len)
        new_batch["labels"][i].extend([-100] * pad_len)

    for k in new_batch:
        new_batch[k] = torch.tensor(new_batch[k])

    return new_batch

# 데이터의 최대 길이 제한
max_seq_length=8192

# collate_fn 테스트 (배치 크기 1. 즉, 데이터 1개에 대해서 전처리를 진행해본다.)
example = train_dataset[0]
batch = collate_fn([example])

print("\n처리된 배치 데이터:")
print("입력 ID 형태:", batch["input_ids"].shape)
print("어텐션 마스크 형태:", batch["attention_mask"].shape)
print("레이블 형태:", batch["labels"].shape)

print('입력에 대한 정수 인코딩 결과:')
print(batch["input_ids"][0].tolist())

# 디코딩된 input_ids 출력
decoded_text = tokenizer.decode(
    batch["input_ids"][0].tolist(),
    skip_special_tokens=False,
    clean_up_tokenization_spaces=False
)

print("\ninput_ids 디코딩 결과:")
print(decoded_text)

print('레이블에 대한 정수 인코딩 결과:')
print(batch["labels"][0].tolist())

# -100이 아닌 부분만 골라 디코딩
label_ids = [token_id for token_id in batch["labels"][0].tolist() if token_id != -100]

decoded_labels = tokenizer.decode(
    label_ids,
    skip_special_tokens=False,
    clean_up_tokenization_spaces=False
)

print("\nlabels 디코딩 결과 (-100 제외):")
print(decoded_labels)

url : https://abit.ly/lisbva

댓글