AI/DL

ClearML 이모저모

민사민서 2024. 8. 29. 15:27

https://github.com/allegroai/clearml/tree/master/examples

 

clearml/examples at master · allegroai/clearml

ClearML - Auto-Magical CI/CD to streamline your AI workload. Experiment Management, Data Management, Pipeline, Orchestration, Scheduling & Serving in one MLOps/LLMOps solution - allegroai/clearml

github.com

요거로 연습해보아도 좋음

 

1. Task Types

  • training (default) - Training a model
  • testing - Testing a component, for example model performance
  • inference - Model inference job (e.g. offline / batch model execution)
  • controller - A task that lays out the logic for other tasks' interactions, manual or automatic (e.g. a pipeline controller)
  • optimizer - A specific type of controller for optimization tasks (e.g. hyperparameter optimization)
  • service - Long lasting or recurring service (e.g. server cleanup, auto ingress, sync services etc.)
  • monitor - A specific type of service for monitoring
  • application - A task implementing custom applicative logic, like autoscaler or clearml-session
  • data_processing - Any data ingress / preprocessing (see ClearML Data)
  • qc - Quality Control (e.g. evaluating model performance vs. blind dataset)

2. class automation.HyperParameterOptimizer()

https://clear.ml/docs/latest/docs/references/sdk/hpo_optimization_hyperparameteroptimizer/

 

3. clearML agent

# install
pip install cleaml-agent

# 필요 시 config 구성 (default docker, app key, github credentials, aws credentials..)
clearml-agent init

참고로 이후에 ~/clearml.conf 직접 수정해도 됩니다 (https://github.com/allegroai/clearml-agent/blob/master/docs/clearml.conf 포맷은 여기 참고)

 

https://clear.ml/docs/latest/docs/clearml_agent/clearml_agent_ref/#parameters

여러 옵션을 줄 수 있는데요 한 예시로 아래와 같이 하면 default라는 큐의 작업을 수신하고 도커에서 실행할 수 있게 한답니다

clearml-agent daemon --queue default --docker

 

4. execute task remotely

for epoch in range(1, args.epochs + 1):
    if epoch > 1:
        # We run training for 1 epoch to make sure nothing crashes then local execution will be terminated.
        # Execution will switch to remote execution by the agent listening to specified queue
        task.execute_remotely(queue_name="default")
    train(args, model, device, train_loader, optimizer, epoch)
    test(args, model, device, test_loader, epoch)

예를 들어 이렇게 실행하다 원격에 던져버리면

큐에 pending 상태로 작업이 잘 들어가있군요

 

5. log report in scalar

def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            Logger.current_logger().report_scalar(
                "train", "loss", iteration=(epoch * len(train_loader) + batch_idx), value=loss.item())

def test(args, model, device, test_loader, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)
    Logger.current_logger().report_scalar(
        "test", "loss", iteration=epoch, value=test_loss)
    Logger.current_logger().report_scalar(
        "test", "accuracy", iteration=epoch, value=(correct / len(test_loader.dataset)))

이렇게 하면 scalar 탭에

  • train 차트 ⇒ iterations에 따른 loss 그래프
  • test 차트 ⇒ iterations에 따른 accuracy, loss 나타내줌

이라네요. 근데 굳이 Logger를 안 썼던거 같은데 실제 프로젝트에선

self.log(f'{phase}/loss', metrics_dict['total_loss'].compute(), prog_bar=True, sync_dist=True)

이런 식으로 model module에서 self.log() 하면 자동으로 로깅되었던 것으로 기억함

 

6. add hyper-parameter to the task

task = Task.init(project_name="Image Example", task_name="Image classification with clearml-data")
params = {
    "number_of_epochs": 20,
    "batch_size": 64,
    "dropout": 0.25,
    "base_lr": 0.001,
    "momentum": 0.9,
    "loss_report": 100,
}
params = task.connect(params)  # enabling configuration override by clearml/

웹 UI의 configuration의 hyperparameters 탭에서 확인 가능함

 

7. create dataset (with CSV in it) and upload

from clearml import StorageManager, Dataset


def main():
    manager = StorageManager()

    print("STEP1 : Downloading CSV dataset")
    csv_file_path = manager.get_local_copy(
        remote_url="https://allegro-datasets.s3.amazonaws.com/datasets/Iris_Species.csv")

    print("STEP2 : Creating a dataset")
    # By default, clearml data uploads to the clearml fileserver. Adding output_uri argument to the create() method
    # allows you to specify custom storage like s3 \ gcs \ azure \ local storage
    simple_dataset = Dataset.create(dataset_project="dataset_examples", dataset_name="CSV_Dataset")

    print("STEP3 : Adding CSV file to the Dataset")
    simple_dataset.add_files(path=csv_file_path)

    print("STEP4 : Upload and finalize")
    simple_dataset.upload()
    simple_dataset.finalize()

    print("We are done, have a great day :)")


if __name__ == '__main__':
    main()

 

8. download MNIST dataset, sync folder, upload to ClearML server

import shutil
from uuid import uuid4

from pathlib2 import Path

from clearml import Dataset, StorageManager


def download_mnist_dataset():
    manager = StorageManager()
    mnist_dataset = Path(manager.get_local_copy(
        remote_url="https://allegro-datasets.s3.amazonaws.com/datasets/MNIST.zip", name="MNIST"))
    mnist_dataset_train = mnist_dataset / "TRAIN"
    mnist_dataset_test = mnist_dataset / "TEST"

    return mnist_dataset_train, mnist_dataset_test


def main():
    print("STEP1 : Downloading mnist dataset")
    mnist_dataset_train, mnist_dataset_test = download_mnist_dataset()

    print("STEP2 : Preparing mnist dataset folder")
    mnist_path = Path(f"MNIST_{uuid4().hex}")
    mnist_train_path = mnist_path / "TRAIN"
    mnist_test_path = mnist_path / "TEST"
    mnist_path.mkdir()

    print("STEP3 : Creating the dataset")
    mnist_dataset = Dataset.create(
        dataset_project="dataset_examples", dataset_name="MNIST Complete Dataset (Syncing Example)")

    print("STEP4 : Syncing train dataset")
    shutil.copytree(mnist_dataset_train, mnist_train_path)  # Populating dataset folder with TRAIN images
    mnist_dataset.sync_folder(mnist_path)
    mnist_dataset.upload()

    print("STEP5 : Syncing test dataset")
    shutil.copytree(mnist_dataset_train, mnist_test_path)  # Populating dataset folder with TEST images
    mnist_dataset.sync_folder(mnist_path)
    mnist_dataset.upload()

    print("STEP6 : Finalizing dataset")
    mnist_dataset.finalize()

    print("We are done, have a great day :)")

 

9. Task.init() ?

- The Task never ran before. No Task with the same task_name and project_name is stored in ClearML Server

- A new Task is forced by calling Task.init with reuse_last_task_id=False

⇒ 이럴 경우 새로운 task 생성

 

Otherwise, the already initialized Task object for the same task_name and project_name is returned, or, when being executed remotely on a clearml-agent, the task returned is the existing task from the backend.

 

10. check task status with id

task = Task.get_task(task_id=task_id)

 

11. define next task to be executed (큐에 넣음)

from clearml import Task
from time import sleep

# Initialize the Task Pipe's first Task used to start the Task Pipe
task = Task.init(
    "examples", "Simple Controller Task", task_type=Task.TaskTypes.controller
)

# Create a hyper-parameter dictionary for the task
param = dict()
# Connect the hyper-parameter dictionary to the task
param = task.connect(param)

# In this example we pass next task's name as a parameter
param["next_task_name"] = "Toy Base Task"
# This is a parameter name in the next task we want to change
param["param_name"] = "Example_Param"
# This is the parameter value in the next task we want to change
param["param_name_new_value"] = 3
# The queue where we want the template task (clone) to be sent to
param["execution_queue_name"] = "default"

# Simulate the work of a Task
print("Processing....")
sleep(2.0)
print("Done processing :)")

# Get a reference to the task to pipe to.
next_task = Task.get_task(
    project_name=task.get_project_name(), task_name=param["next_task_name"]
)

# Clone the task to pipe to. This creates a task with status Draft whose parameters can be modified.
cloned_task = Task.clone(source_task=next_task, name="Auto generated cloned task")

# Get the original parameters of the Task, modify the value of one parameter,
#   and set the parameters in the next Task
cloned_task_parameters = cloned_task.get_parameters()
cloned_task_parameters[param["param_name"]] = param["param_name_new_value"]
cloned_task.set_parameters(cloned_task_parameters)

# Enqueue the Task for execution. The enqueued Task must already exist in the clearml platform
print(
    "Enqueue next step in pipeline to queue: {}".format(param["execution_queue_name"])
)
Task.enqueue(cloned_task.id, queue_name=param["execution_queue_name"])

# We are done. The next step in the pipe line is in charge of the pipeline now.
print("Done")

 

12. plot reporting

 

이 외에도 뭔가 많은데 공식 홈페이지 docs 참고.. ㄱㄱㄱ

 

사실 많이 쓰는 기능은

- 서버에 clearml-agent 켜놓기 + 도커 세팅하기

- task 생성하기 (aws 서버로 output uri, base docker 세팅, 큐 세팅, execute remotely)

- hyperparameter logging 하기, ckpt 다운로드하기

등등임..