Stable Diffusion로 txt2img 파인튜닝해보기(2)

목록으로 돌아가기

본 글은 keras 공식 홈페이지의 스테이블 디퓨전 파인튜닝 예시 코드를 기반으로 작성됐습니다.
함께 학습한 안효주님의 블로그 바로가기




코드를 단락별로 설명해볼게요!



import

텐서플로우와 keras를 사용합니다.

from textwrap import wrap
import os
import keras_cv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.experimental.numpy as tnp
from keras_cv.models.stable_diffusion.clip_tokenizer import SimpleTokenizer
from keras_cv.models.stable_diffusion.diffusion_model import DiffusionModel
from keras_cv.models.stable_diffusion.image_encoder import ImageEncoder
from keras_cv.models.stable_diffusion.noise_scheduler import NoiseScheduler
from keras_cv.models.stable_diffusion.text_encoder import TextEncoder
from tensorflow import keras



데이터 로딩

먼저 pandas 라이브러리를 사용하여 CSV 파일인 “./dataset.csv”를 로드합니다. 이 CSV 파일에는 텍스트 데이터와 이미지 경로가 포함되어 있습니다.

data_frame = pd.read_csv("./dataset.csv")



텍스트 데이터 전처리

텍스트 데이터를 처리하기 위해 PADDING_TOKEN과 MAX_PROMPT_LENGTH를 설정합니다. PADDING_TOKEN은 텍스트의 패딩을 나타내며, MAX_PROMPT_LENGTH는 텍스트의 최대 길이를 나타냅니다.

PADDING_TOKEN = 49407
MAX_PROMPT_LENGTH = 77

tokenizer = SimpleTokenizer()

process_text 함수는 주어진 캡션 텍스트를 토큰화하고 패딩하여 고정된 길이로 만드는 역할을 합니다. 이 함수는 SimpleTokenizer를 사용하여 텍스트를 토큰화하고, 부족한 부분은 PADDING_TOKEN으로 채웁니다.

def process_text(caption):
    tokens = tokenizer.encode(caption)
    tokens = tokens + [PADDING_TOKEN] * (MAX_PROMPT_LENGTH - len(tokens))
    return np.array(tokens)

그 다음 아래 코드는 모든 텍스트 캡션을 토큰화하고 패딩하여 tokenized_texts 배열에 저장합니다. 각 행은 하나의 캡션을 나타내며, 각 행의 길이는 MAX_PROMPT_LENGTH와 같아집니다.

tokenized_texts = np.empty((len(data_frame), MAX_PROMPT_LENGTH))

all_captions = list(data_frame["caption"].values)
for i, caption in enumerate(all_captions):
    tokenized_texts[i] = process_text(caption)

이미지 데이터 전처리

이미지 데이터를 처리하기 위해 이미지 해상도를 RESOLUTION으로 설정하고, TensorFlow의 자동 배치 크기 지정을 위해 AUTO를 설정합니다. 또한 텍스트의 위치 정보를 나타내는 POS_IDS를 설정합니다.

RESOLUTION = 256
AUTO = tf.data.AUTOTUNE
POS_IDS = tf.convert_to_tensor([list(range(MAX_PROMPT_LENGTH))], dtype=tf.int32)

augmenter는 이미지 데이터에 대한 데이터 증강(Data Augmentation)을 정의하는 부분입니다. 이미지를 중앙에서 자르고 무작위로 뒤집고, 픽셀 값을 [-1, 1] 범위로 스케일링하는 작업을 수행합니다.

augmenter = keras.Sequential(
    layers=[
        keras_cv.layers.CenterCrop(RESOLUTION, RESOLUTION),
        keras_cv.layers.RandomFlip(),
        tf.keras.layers.Rescaling(scale=1.0 / 127.5, offset=-1),
    ]
)

text_encoder는 텍스트 데이터를 처리하기 위한 텍스트 인코더를 정의하는 부분입니다. 이 인코더는 텍스트 데이터를 모델 입력에 맞게 인코딩합니다.

text_encoder = TextEncoder(MAX_PROMPT_LENGTH)



이미지 데이터 처리

process_image 함수는 이미지 파일의 경로(image_path)와 해당 이미지와 관련된 토큰화된 텍스트(tokenized_text)를 입력으로 받아, 이미지를 읽고 디코딩한 후 지정된 해상도(RESOLUTION)로 크기를 조정합니다.

def process_image(image_path, tokenized_text):
    image = tf.io.read_file(image_path)  # 이미지 파일 읽기
    image = tf.io.decode_png(image, 3)  # PNG 형식 이미지 디코딩
    image = tf.image.resize(image, (RESOLUTION, RESOLUTION))  # 이미지 크기 조정
    return image, tokenized_text



데이터 증강 (Data Augmentation)

apply_augmentation 함수는 이미지와 텍스트 데이터 배치를 입력으로 받아 데이터 증강을 적용합니다. 이미지 데이터에는 이미지 크기 조정 및 무작위 뒤집기와 같은 증강 작업이 수행됩니다.

def apply_augmentation(image_batch, token_batch):
    return augmenter(image_batch), token_batch



텍스트 인코딩

run_text_encoder 함수는 이미지 배치와 토큰 배치를 입력으로 받아, 텍스트 인코더를 사용하여 텍스트를 인코딩합니다. 이 함수는 이미지, 토큰, 그리고 텍스트의 인코딩 결과를 반환합니다.

def run_text_encoder(image_batch, token_batch):
    return (
        image_batch,
        token_batch,
        text_encoder([token_batch, POS_IDS], training=False),
    )

데이터셋 준비

prepare_dict 함수는 이미지, 토큰, 그리고 인코딩된 텍스트를 딕셔너리로 묶어 반환합니다. prepare_dataset 함수는 이미지 파일 경로와 토큰화된 텍스트 데이터를 입력으로 받아 데이터셋을 준비합니다. 데이터셋은 다음과 같은 단계를 거쳐 준비됩니다:

이미지 파일 경로와 토큰화된 텍스트를 결합하고 데이터를 무작위로 섞습니다. process_image 함수를 사용하여 이미지 데이터를 처리하고 배치를 구성합니다. 이미지 데이터에 데이터 증강을 적용합니다. run_text_encoder 함수를 사용하여 텍스트를 인코딩합니다. prepare_dict 함수를 사용하여 데이터를 딕셔너리로 묶고 데이터셋을 최적화합니다.

def prepare_dict(image_batch, token_batch, encoded_text_batch):
    return {
        "images": image_batch,
        "tokens": token_batch,
        "encoded_text": encoded_text_batch,
    }

def prepare_dataset(image_paths, tokenized_texts, batch_size=1):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, tokenized_texts))
    dataset = dataset.shuffle(batch_size * 10)
    dataset = dataset.map(process_image, num_parallel_calls=AUTO).batch(batch_size)
    dataset = dataset.map(apply_augmentation, num_parallel_calls=AUTO)
    dataset = dataset.map(run_text_encoder, num_parallel_calls=AUTO)
    dataset = dataset.map(prepare_dict, num_parallel_calls=AUTO)
    return dataset.prefetch(AUTO)

prepare_dataset 함수를 사용하여 데이터셋을 준비합니다. 이미지 파일 경로(data_frame[“image_path”])와 토큰화된 텍스트(tokenized_texts)를 입력으로 사용하며, 배치 크기는 4로 설정합니다. 이렇게 생성된 training_dataset은 학습에 사용될 데이터셋입니다.

# Prepare the dataset.
training_dataset = prepare_dataset(
    np.array(data_frame["image_path"]), tokenized_texts, batch_size=4
)

Train 클래스

이 코드는 Trainer 클래스의 초기화 메서드입니다. 클래스는 Diffusion 모델 (diffusion_model), VAE (Variational Autoencoder) 모델 (vae), 노이즈 스케줄러 (noise_scheduler) 등을 초기화합니다. use_mixed_precision은 혼합 정밀도(mixed precision)를 사용할지 여부를 나타내는 플래그이며, max_grad_norm은 그래디언트 클리핑을 위한 최대 그래디언트 노름(norm) 값입니다. vae 모델은 학습에서 제외됩니다.

class Trainer(tf.keras.Model):
    def __init__(
        self,
        diffusion_model,
        vae,
        noise_scheduler,
        use_mixed_precision=False,
        max_grad_norm=1.0,
        **kwargs
    ):
        super().__init__(**kwargs)

        self.diffusion_model = diffusion_model
        self.vae = vae
        self.noise_scheduler = noise_scheduler
        self.max_grad_norm = max_grad_norm

        self.use_mixed_precision = use_mixed_precision
        self.vae.trainable = False

이 코드는 Trainer 클래스의 핵심 학습 메서드인 train_step 메서드입니다. images와 encoded_text를 입력으로 받고, 그래디언트 계산을 위한 tf.GradientTape를 설정합니다. vae 모델을 사용하여 이미지 데이터에서 잠재 벡터 latents를 샘플링합니다. 잠재 벡터에 노이즈를 추가하고 timesteps를 생성합니다. target은 노이즈입니다. timestep_embedding은 시간 단계를 위한 임베딩을 생성합니다. model_pred는 Diffusion 모델에 입력 데이터를 전달한 결과입니다. 손실(loss)은 target과 model_pred 간의 평균 제곱 오차(MSE)입니다. 그래디언트를 계산하고 클리핑을 수행한 후, 모델 가중치를 업데이트합니다.

    def train_step(self, inputs):
        images = inputs["images"]
        encoded_text = inputs["encoded_text"]
        batch_size = tf.shape(images)[0]

        with tf.GradientTape() as tape:
            latents = self.sample_from_encoder_outputs(self.vae(images, training=False))
            latents = latents * 0.18215

            noise = tf.random.normal(tf.shape(latents))

            timesteps = tnp.random.randint(
                0, self.noise_scheduler.train_timesteps, (batch_size,)
            )

            noisy_latents = self.noise_scheduler.add_noise(
                tf.cast(latents, noise.dtype), noise, timesteps
            )

            target = noise

            timestep_embedding = tf.map_fn(
                lambda t: self.get_timestep_embedding(t), timesteps, dtype=tf.float32
            )
            timestep_embedding = tf.squeeze(timestep_embedding, 1)
            model_pred = self.diffusion_model(
                [noisy_latents, timestep_embedding, encoded_text], training=True
            )
            loss = self.compiled_loss(target, model_pred)
            if self.use_mixed_precision:
                loss = self.optimizer.get_scaled_loss(loss)

        trainable_vars = self.diffusion_model.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        if self.use_mixed_precision:
            gradients = self.optimizer.get_unscaled_gradients(gradients)
        gradients = [tf.clip_by_norm(g, self.max_grad_norm) for g in gradients]
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        return {m.name: m.result() for m in self.metrics}

get_timestep_embedding 메서드는 시간 단계(timestep)를 임베딩하는 데 사용됩니다. dim은 임베딩 차원 수를 나타내며, 기본값은 320입니다. max_period는 최대 주기를 나타내며, 기본값은 10000입니다. log_max_period는 max_period의 자연 로그를 계산합니다. freqs는 주기 함수의 주파수를 계산합니다. args는 timestep과 주파수를 곱한 값을 나타냅니다. embedding은 코사인(cosine)과 사인(sine) 함수를 사용하여 임베딩을 생성하고, 이를 하나의 벡터로 결합한 다음 형태를 조정합니다. 최종적으로 임베딩 벡터를 반환합니다.

def get_timestep_embedding(self, timestep, dim=320, max_period=10000):
    half = dim // 2
    log_max_period = tf.math.log(tf.cast(max_period, tf.float32))
    freqs = tf.math.exp(
        -log_max_period * tf.range(0, half, dtype=tf.float32) / half
    )
    args = tf.convert_to_tensor([timestep], dtype=tf.float32) * freqs
    embedding = tf.concat([tf.math.cos(args), tf.math.sin(args)], 0)
    embedding = tf.reshape(embedding, [1, -1])
    return embedding

sample_from_encoder_outputs 메서드는 VAE(Variational Autoencoder) 모델의 출력에서 샘플링하는 데 사용됩니다. outputs는 VAE 모델의 출력으로부터 평균(mean)과 로그 분산(logvar)을 추출합니다. logvar 값을 -30.0에서 20.0 사이의 범위로 클리핑합니다. std는 표준 편차(standard deviation)를 계산합니다. sample은 평균이 0이고 표준 편차가 1인 정규 분포에서 샘플을 생성합니다. 생성된 샘플을 통해 잠재 공간(latent space)에서 샘플링한 값을 반환합니다.

def sample_from_encoder_outputs(self, outputs):
    mean, logvar = tf.split(outputs, 2, axis=-1)
    logvar = tf.clip_by_value(logvar, -30.0, 20.0)
    std = tf.exp(0.5 * logvar)
    sample = tf.random.normal(tf.shape(mean), dtype=mean.dtype)
    return mean + std * sample

save_weights 메서드는 모델 가중치를 파일로 저장하는 데 사용됩니다. filepath는 가중치를 저장할 파일 경로를 나타냅니다. overwrite는 파일이 이미 존재할 경우 덮어쓸지 여부를 결정하는 플래그입니다. save_format은 가중치 파일의 저장 포맷을 지정합니다. 예를 들어, “h5” 또는 “tf”와 같은 포맷을 선택할 수 있습니다. options는 저장 옵션을 설정하는데 사용됩니다.

def save_weights(self, filepath, overwrite=True, save_format=None, options=None):
    self.diffusion_model.save_weights(
        filepath=filepath,
        overwrite=overwrite,
        save_format=save_format,
        options=options,
    )

모델 및 학습 설정

USE_MP는 mixed precision 학습을 사용할지 여부를 나타내는 플래그입니다. keras.mixed_precision.set_global_policy(“mixed_float16”)은 mixed precision을 설정하는 부분으로, 텐서플로에서 학습 속도를 향상시키기 위해 FP16 혼합 정밀도(mixed precision)를 사용합니다. 이를 통해 GPU 메모리를 효율적으로 활용하며 모델을 빠르게 학습시킬 수 있습니다.

USE_MP = True
if USE_MP:
    keras.mixed_precision.set_global_policy("mixed_float16")

ImageEncoder는 이미지를 임베딩하는 역할을 하는 모델입니다. RESOLUTION으로 정의된 이미지 해상도를 입력으로 받습니다.

image_encoder = ImageEncoder(RESOLUTION, RESOLUTION)

Trainer 클래스의 인스턴스 diffusion_ft_trainer를 생성합니다. diffusion_model은 이미지 생성에 사용되는 모델로, 이미지 해상도와 최대 프롬프트 길이를 인자로 받습니다. vae는 변이형 오토인코더(Variational Autoencoder) 모델을 나타내며, 이미지 인코딩에 사용됩니다. noise_scheduler는 학습 중에 노이즈를 조절하는 스케줄러를 설정합니다. use_mixed_precision은 mixed precision 학습을 사용할지 여부를 지정합니다.

diffusion_ft_trainer = Trainer(
    diffusion_model=DiffusionModel(RESOLUTION, RESOLUTION, MAX_PROMPT_LENGTH),
    vae=tf.keras.Model(
        image_encoder.input,
        image_encoder.layers[-2].output,
    ),
    noise_scheduler=NoiseScheduler(),
    use_mixed_precision=USE_MP,
)

옵티마이저 설정

학습에 사용할 옵티마이저를 설정합니다. 여기서는 AdamW 옵티마이저를 사용합니다. lr은 학습률(learning rate)을 나타냅니다. beta_1과 beta_2는 Adam 옵티마이저의 하이퍼파라미터입니다. weight_decay는 가중치 감소(weight decay)를 설정하는데, 모델의 일반화를 돕는 역할을 합니다. epsilon은 수치 안정성을 위한 작은 값입니다.

lr = 1e-5
beta_1, beta_2 = 0.9, 0.999
weight_decay = (1e-2,)
epsilon = 1e-08

optimizer = tf.keras.optimizers.experimental.AdamW(
    learning_rate=lr,
    weight_decay=weight_decay,
    beta_1=beta_1,
    beta_2=beta_2,
    epsilon=epsilon,
)

모델 컴파일 및 학습

모델을 컴파일합니다. 이 단계에서 모델에 옵티마이저와 손실 함수를 설정합니다.

diffusion_ft_trainer.compile(optimizer=optimizer, loss="mse")

epochs는 전체 학습 데이터셋을 몇 번 반복할지 결정합니다. ckpt_path는 학습된 모델 가중치를 저장할 파일 경로입니다. ModelCheckpoint 콜백은 학습 중에 모델의 가중치를 저장하는데 사용됩니다. 여기서는 손실(loss)이 최소일 때 모델을 저장하도록 설정되어 있습니다.

epochs = 200
ckpt_path = "finetuned_stable_diffusion.h5"
ckpt_callback = tf.keras.callbacks.ModelCheckpoint(
    ckpt_path,
    save_weights_only=True,
    monitor="loss",
    mode="min",
)

fit 메서드를 사용하여 모델을 실제로 학습시킵니다. 학습 데이터셋인 training_dataset과 설정된 에포크 수만큼 학습이 진행됩니다.

diffusion_ft_trainer.fit(training_dataset, epochs=epochs, callbacks=[ckpt_callback])

모델은 학습이 완료가되면 위에서 선언한대로 저장이 됩니다.



Etc

author-profile
Written by 유찬영

댓글