https://github.com/allegroai/clearml/tree/master/examples
요거로 연습해보아도 좋음
1. Task Types
- training (default) - Training a model
- testing - Testing a component, for example model performance
- inference - Model inference job (e.g. offline / batch model execution)
- controller - A task that lays out the logic for other tasks' interactions, manual or automatic (e.g. a pipeline controller)
- optimizer - A specific type of controller for optimization tasks (e.g. hyperparameter optimization)
- service - Long lasting or recurring service (e.g. server cleanup, auto ingress, sync services etc.)
- monitor - A specific type of service for monitoring
- application - A task implementing custom applicative logic, like autoscaler or clearml-session
- data_processing - Any data ingress / preprocessing (see ClearML Data)
- qc - Quality Control (e.g. evaluating model performance vs. blind dataset)
2. class automation.HyperParameterOptimizer()
https://clear.ml/docs/latest/docs/references/sdk/hpo_optimization_hyperparameteroptimizer/
3. clearML agent
# install
pip install cleaml-agent
# 필요 시 config 구성 (default docker, app key, github credentials, aws credentials..)
clearml-agent init
참고로 이후에 ~/clearml.conf 직접 수정해도 됩니다 (https://github.com/allegroai/clearml-agent/blob/master/docs/clearml.conf 포맷은 여기 참고)
https://clear.ml/docs/latest/docs/clearml_agent/clearml_agent_ref/#parameters
여러 옵션을 줄 수 있는데요 한 예시로 아래와 같이 하면 default라는 큐의 작업을 수신하고 도커에서 실행할 수 있게 한답니다
clearml-agent daemon --queue default --docker
4. execute task remotely
for epoch in range(1, args.epochs + 1):
if epoch > 1:
# We run training for 1 epoch to make sure nothing crashes then local execution will be terminated.
# Execution will switch to remote execution by the agent listening to specified queue
task.execute_remotely(queue_name="default")
train(args, model, device, train_loader, optimizer, epoch)
test(args, model, device, test_loader, epoch)
예를 들어 이렇게 실행하다 원격에 던져버리면
큐에 pending 상태로 작업이 잘 들어가있군요
5. log report in scalar
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
Logger.current_logger().report_scalar(
"train", "loss", iteration=(epoch * len(train_loader) + batch_idx), value=loss.item())
def test(args, model, device, test_loader, epoch):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
Logger.current_logger().report_scalar(
"test", "loss", iteration=epoch, value=test_loss)
Logger.current_logger().report_scalar(
"test", "accuracy", iteration=epoch, value=(correct / len(test_loader.dataset)))
이렇게 하면 scalar 탭에
- train 차트 ⇒ iterations에 따른 loss 그래프
- test 차트 ⇒ iterations에 따른 accuracy, loss 나타내줌
이라네요. 근데 굳이 Logger를 안 썼던거 같은데 실제 프로젝트에선
self.log(f'{phase}/loss', metrics_dict['total_loss'].compute(), prog_bar=True, sync_dist=True)
이런 식으로 model module에서 self.log() 하면 자동으로 로깅되었던 것으로 기억함
6. add hyper-parameter to the task
task = Task.init(project_name="Image Example", task_name="Image classification with clearml-data")
params = {
"number_of_epochs": 20,
"batch_size": 64,
"dropout": 0.25,
"base_lr": 0.001,
"momentum": 0.9,
"loss_report": 100,
}
params = task.connect(params) # enabling configuration override by clearml/
웹 UI의 configuration의 hyperparameters 탭에서 확인 가능함
7. create dataset (with CSV in it) and upload
from clearml import StorageManager, Dataset
def main():
manager = StorageManager()
print("STEP1 : Downloading CSV dataset")
csv_file_path = manager.get_local_copy(
remote_url="https://allegro-datasets.s3.amazonaws.com/datasets/Iris_Species.csv")
print("STEP2 : Creating a dataset")
# By default, clearml data uploads to the clearml fileserver. Adding output_uri argument to the create() method
# allows you to specify custom storage like s3 \ gcs \ azure \ local storage
simple_dataset = Dataset.create(dataset_project="dataset_examples", dataset_name="CSV_Dataset")
print("STEP3 : Adding CSV file to the Dataset")
simple_dataset.add_files(path=csv_file_path)
print("STEP4 : Upload and finalize")
simple_dataset.upload()
simple_dataset.finalize()
print("We are done, have a great day :)")
if __name__ == '__main__':
main()
8. download MNIST dataset, sync folder, upload to ClearML server
import shutil
from uuid import uuid4
from pathlib2 import Path
from clearml import Dataset, StorageManager
def download_mnist_dataset():
manager = StorageManager()
mnist_dataset = Path(manager.get_local_copy(
remote_url="https://allegro-datasets.s3.amazonaws.com/datasets/MNIST.zip", name="MNIST"))
mnist_dataset_train = mnist_dataset / "TRAIN"
mnist_dataset_test = mnist_dataset / "TEST"
return mnist_dataset_train, mnist_dataset_test
def main():
print("STEP1 : Downloading mnist dataset")
mnist_dataset_train, mnist_dataset_test = download_mnist_dataset()
print("STEP2 : Preparing mnist dataset folder")
mnist_path = Path(f"MNIST_{uuid4().hex}")
mnist_train_path = mnist_path / "TRAIN"
mnist_test_path = mnist_path / "TEST"
mnist_path.mkdir()
print("STEP3 : Creating the dataset")
mnist_dataset = Dataset.create(
dataset_project="dataset_examples", dataset_name="MNIST Complete Dataset (Syncing Example)")
print("STEP4 : Syncing train dataset")
shutil.copytree(mnist_dataset_train, mnist_train_path) # Populating dataset folder with TRAIN images
mnist_dataset.sync_folder(mnist_path)
mnist_dataset.upload()
print("STEP5 : Syncing test dataset")
shutil.copytree(mnist_dataset_train, mnist_test_path) # Populating dataset folder with TEST images
mnist_dataset.sync_folder(mnist_path)
mnist_dataset.upload()
print("STEP6 : Finalizing dataset")
mnist_dataset.finalize()
print("We are done, have a great day :)")
9. Task.init() ?
- The Task never ran before. No Task with the same task_name and project_name is stored in ClearML Server
- A new Task is forced by calling Task.init with reuse_last_task_id=False
⇒ 이럴 경우 새로운 task 생성
Otherwise, the already initialized Task object for the same task_name and project_name is returned, or, when being executed remotely on a clearml-agent, the task returned is the existing task from the backend.
10. check task status with id
task = Task.get_task(task_id=task_id)
11. define next task to be executed (큐에 넣음)
from clearml import Task
from time import sleep
# Initialize the Task Pipe's first Task used to start the Task Pipe
task = Task.init(
"examples", "Simple Controller Task", task_type=Task.TaskTypes.controller
)
# Create a hyper-parameter dictionary for the task
param = dict()
# Connect the hyper-parameter dictionary to the task
param = task.connect(param)
# In this example we pass next task's name as a parameter
param["next_task_name"] = "Toy Base Task"
# This is a parameter name in the next task we want to change
param["param_name"] = "Example_Param"
# This is the parameter value in the next task we want to change
param["param_name_new_value"] = 3
# The queue where we want the template task (clone) to be sent to
param["execution_queue_name"] = "default"
# Simulate the work of a Task
print("Processing....")
sleep(2.0)
print("Done processing :)")
# Get a reference to the task to pipe to.
next_task = Task.get_task(
project_name=task.get_project_name(), task_name=param["next_task_name"]
)
# Clone the task to pipe to. This creates a task with status Draft whose parameters can be modified.
cloned_task = Task.clone(source_task=next_task, name="Auto generated cloned task")
# Get the original parameters of the Task, modify the value of one parameter,
# and set the parameters in the next Task
cloned_task_parameters = cloned_task.get_parameters()
cloned_task_parameters[param["param_name"]] = param["param_name_new_value"]
cloned_task.set_parameters(cloned_task_parameters)
# Enqueue the Task for execution. The enqueued Task must already exist in the clearml platform
print(
"Enqueue next step in pipeline to queue: {}".format(param["execution_queue_name"])
)
Task.enqueue(cloned_task.id, queue_name=param["execution_queue_name"])
# We are done. The next step in the pipe line is in charge of the pipeline now.
print("Done")
12. plot reporting
- https://clear.ml/docs/latest/docs/guides/reporting/plotly_reporting
- https://clear.ml/docs/latest/docs/guides/reporting/scatter_hist_confusion_mat_reporting
이 외에도 뭔가 많은데 공식 홈페이지 docs 참고.. ㄱㄱㄱ
사실 많이 쓰는 기능은
- 서버에 clearml-agent 켜놓기 + 도커 세팅하기
- task 생성하기 (aws 서버로 output uri, base docker 세팅, 큐 세팅, execute remotely)
- hyperparameter logging 하기, ckpt 다운로드하기
등등임..
'AI > DL' 카테고리의 다른 글
[논문리뷰] Deep Residual Learning for Image Recognition (0) | 2024.11.23 |
---|---|
서버 ClearML agent 설정 + 도커에서 돌아가게 세팅하기 (0) | 2024.08.29 |
효율적인 MLOps를 가능케 하는 ClearML (0) | 2024.08.29 |
lightning-hydra-template CIFAR-10 데이터셋 학습해보기 (1) | 2024.08.29 |
lightning-hydra-template 코드분석 (2) | 2024.08.29 |