관리 메뉴

코딩 기록 저장소

[쿠버플로우] 파이프라인 스터디 본문

개인 공부/쿠버플로우

[쿠버플로우] 파이프라인 스터디

KimNang 2025. 8. 12. 10:47

목차

     

    1. 파이프라인 개요

    - 공식 문서를 참고함!

    01. 파이프라인 (Pipelines)

    - 쿠브플로우 파이프라인은 재사용 가능하고 확장성이 좋은 머신러닝 워크플로우를 정의하는 방법

    - 컨테이너 기반으로 작동하며, 전체 워크플로우를 구성하는 여러 단계(steps)와 입력 파라미터로 이루어져 있음

     

     

    02. 파이프라인 컴포넌트

    - 파이프라인 컴포넌트는 파이프라인의 한 단계를 수행하는 컨테이너화된 애플리케이션

    - 각 컴포넌트는 다음의 내용이 정의된 컴포넌트 명세(specifications)를 가짐

    • 인터페이스 : 입력(input) 및 출력(outputs)
    • 구현 : 사용할 컨테이너 이미지와 실행할 명령어
    • 메타데이터 : 컴포넌트의 이름 및 설명 등

    - 컴포넌트는 직접 명세를 정의하거나, 쿠브플로우 파이프라인 SDK를 사용해 파이썬 함수 기반으로 생성하거나, 이미 만들어진 컴포넌트를 재사용할 수 있음

     

     

    03. 파이프라인 그래프

    - 파이프라인의 각 단계는 컴포넌트의 인스턴스

    - 각 단계의 입력은 파이프라인의 입력 인자, 상수, 또는 다른 단계의 출력으로부터 올 수 있음

    - 쿠브플로우는 이러한 데이터 의존성을 기반으로 파이프라인의 워크플로우를 그래프 형태로 구성함

    - '통계 생성'과 '데이터 전처리' 모두 '데이터 수집'에만 의존하므로, 두 단계는 병렬로 실행될 수 있음

    • 데이터 수집 : 외부 소스에서 데이터를 불러와 데이터셋을 출력함. 의존성이 없으므로 가장 먼저 실행될 수 있음
    • 통계 생성 : '데이터 수집' 단계의 출력인 데이터셋을 사용함. 따라서 이 단계는 '데이터 수집' 이후에 실행되어야 함
    • 데이터 전처리 : 이 단계도 '데이터 수집' 단계의 출력을 모두 사용함.
    • 모델 학습 : '통계 생성'과 '데이터 전처리' 단계의 출력을 모두 사용함.

     

     

    04. 파이프라인 설계

    - 파이프라인을 설계할 때는 ML 워크플로우를 여러 컴포넌트로 어떻게 나눌지 고민해야 함

    - 이는 모놀리식 스크립트를 재사용 가능한 함수로 나누는 과정과 유사함

    • 단일 책임 원칙 : 컴포턴트는 하나의 기능만 담당해야 재사용성과 테스트 용이성이 높아짐
    • 재사용 : 가능하다면 쿠브플로우에서 제공하는 기존 컴포넌트를 재사용하는 것이 좋음
    • 디버깅 및 추적 : 파이프라인은 각 단계의 입력과 출력을 저장함. 이를 통해 모델 품질의 변화 원인을 파악하거나 워크플로우의 버그를 추적하기 쉬워짐

     

     

    05. 파이프라인 컴포넌트 구축

    • 컨테이너 명세 : 컨테이너화된 애플리케이션이 있다면, 컴포넌트 명세를 만들어 파이프라인 텀포넌트로 정의할 수 있음. 어떤 언어로 작성되었든 컨테이너 이미지로 만들 수 있다면 사용 가능함
    • 파이썬 함수 기반 컴포넌트 : 컴포넌트 코드가 파이썬 함수로 표현될 수 있다면, 쿠브플로우 파이프라인 SDK를 사용해 쉽게 컴포넌트를 생성할 수 있음
    • 사전 구축 컴포넌트 재사용 : 직접 컴포넌트를 만드는 대신, 이미 제공되는 컴포넌트를 재사용하여 개발 노력을 줄일 수 있음

     

     

    06. 컴포넌트 간 데이터 전달

    - 쿠브플로우 파이프라인은 컴포넌트를 실행할 때, 쿠버네티스 파드(Pod)에서 컨테이너 이미지를 시작하고, 입력값을 명령줄 인자로 전달함

    - 컴포넌트가 작업을 마치면, 출력값은 파일로 반환됨

    • 작은 데이터 : 숫자나 짧은 문자열 등 작은 입력값은 값(by value)으로 직접 전달할 수 있음
    • 큰 데이터 : 데이터셋과 같은 큰 입력값은 파일 경로(file paths)로 전달해야 함. 출력 역시 쿠브플로우가 제공하는 경로에 파일로 작성됨

    - 파이썬 함수 기반 컴포넌트는 이러한 입력 및 출력 전달 과정을 자동으로 처리해줘서 편리하게 컴포넌트를 만들 수 있음

     

     

    2. 파이프라인 패키지

    01. Python SDK (kfp)

    - kfp는 kubeflow pipelines을 개발할 수 있도록 해주는 Python SDK

    - 이를 통해 파이프라인 구성, 각 step 마다 실행되는 이미지 등을 설정 할 수 있음

     

    kfp 설치

    - 설치는 간단한 pip 커맨드로 설치가 가능함

    $ pip install kfp

    - kfp에는 총 4가지의 라이브러리 패키지가 있는데 그 중 세가지를 소개함

    1. kfp.Client
    2. kfp.compiler
    3. kfp.dsl

     

    kfp.Client

    - kubernetes에 설치된 kubeflow pipelines api와 통신하기 위한 클라이언트 라이브러리

    - 파이프라인 업로드, 실험 생성, run 생성 등의 기능이 있음

    • kfp.Client() : kubeflow pipelines api와 통신하기 위한 클라이언트 선언
    • .get_pipeline_id() : 파이프라인 이름으로 파이프라인 ID 검색
    • .upload_pipeline() : 변환된 파이썬 DSL 코드를 파이프라인으로 업로드
    • .upload_pipeline_version() : 위 .upload_pipeline()과 동일한 기능. 단 버전을 지정해서 업로드
    • .create_experiment() : 실험 생성
    • .run_pipeline() : 변환된 파이썬 DSL 코드를 해당 실험에서 RUN

     

    kfp.compiler

    - 파이프라인을 빌드하는 라이브러리

    • kfp.compiler.Compiler().compile() : 파이썬 DSL 코드를 파이프라인 업로드를 위한 YAML 파일로 변환

     

    kfp.dsl

    - 파이프라인의 각 step을 구성할때 사용되는 클래스, 모듈 등의 라이브러리

    • .ContainerOp() : 컨테이너 이미지로 구성된 pipeline task
    • .ResourceOp() : kubernetes resource를 생성하거나 다루는 pipeline task
    • .VolumeOp() : kubernetes PVC를 생성하는 pipeline task

     

    02. ContainerOp

    ContainerOp 개념

    - 컨테이너 이미지로 구성된 파이프라인 한 step을 의미함

    • 주요 파라미터
      • name : str 값으로 컴포넌트의 이름
      • image : 사용할 컨테이너 이미지 이름
      • command : 컨테이너 이미지 실행 명령어
      • arguments : 컨테이너 이미지 실행 인자값
      • file_outputs : 컨테이너 결과를 외부로 노출시킬 때 사용
      • pvolumes : volume을 컨테이너 경로에 마운트
    • 주요 메소드
      • .set_display_name() : 파이프라인 시각화 할 때 표시될 이름
      • .after() : 특정 파이프라인 step이 완료된 후에 실행 함을 지정
      • .add_volume() : 컨테이너에 volume을 생성
      • .add_volume_mount() : 컨테이너에 volume을 마운트
      • .add_env_variable() : 컨테이너 속 환경변수를 설정

     

    volume을 사용하는 세가지 방법

    - onprem 사용하여 volume을 mount 하는 경우 pvc가 미리 생성 되어야 함

    - ContainerOp의 전체 파라미터 및 사용법은 Docs에서 확인 가능함

    https://kubeflow-pipelines.readthedocs.io/en/sdk-2.14.2/

    from kfp import dsl
    from kfp import onprem
    from kubernetes import client as k8s_client
    
    @dsl.pipeline(
        name="pipeline example",
        description="example for volume"
    )
    def pipeline():
    	"""..생략.."""
    	
        # VolumeOp 사용 예제
    	data_vop = dsl.VolumeOp(
    		name="data-volume",
        	resource_name="data-pvc",
        	modes=dsl.VOLUME_MODE_RWO,
        	size="100Mi"
        )
        
        data_op = dsl.ContainerOp(
        	name="data preprocess",
            image="byeongjokim/data-preprocess",
            pvolumes={"/data": data_vop.volume}
        )
        
        
        # add_volume, add_volume_mount 사용
        data_op = dsl.ContainerOp(
        	name="data preprocess",
            image="byeongjokim/data-preprocess"
        ).add_volume(
        	k8s_client.V1Volume(
            	name='data-pvc'
            )
        )\
        .add_volume_mount(
        	k8s_client.V1VolumeMount(
            	mount_path="/data",
                name="data-pvc"
            )
        )
        
        # onperm 사용
        data_op = dsl.ContainerOp(
        	name="data preprocess",
            image="byeongjokim/data-preprocess"
        ).apply(onprem.mount_pvc("data-pvc", volume_name="data", volume_mount_path="/data"))

     

    03. kfp Example

    kfp Example

    - 위 코드와 같이 여러 ContainerOp와 .after()을 이용해 step과 순서를 지정해줌

    - 추가적으로 dsl.Condition()을 사용하여 특정 accuracy 이상일 경우에만 다음 step으로 이어지게 할 수 있음

    https://github.com/byeongjokim/MLOps-Example

     

    GitHub - byeongjokim/MLOps-Example

    Contribute to byeongjokim/MLOps-Example development by creating an account on GitHub.

    github.com

     

    (1) 공통 환경변수 정의

    - 모든 단계에서 사용할 관리 URL을 환경변수로 설정

    - 각 컨테이너에서 REST API 호출 등 모니터링·로그 수집에 활용

    ENV_MANAGE_URL = V1EnvVar(name='MANAGE_URL', value='http://220.116.228.93:8088/send')

     

     

    (2) 데이터 수집 및 전처리

    - 전처리 전용 컨테이너 이미지 실행

    - PVC(data-pvc)를 /data에 마운트하여 이후 단계와 공유

    data_0 = dsl.ContainerOp(
        name="load & preprocess data pipeline",
        image="byeongjokim/mnist-pre-data:latest",
    )

     

     

    (3) 데이터 검증

    - 전처리 완료 데이터를 검증

    - after(data_0)로 선행 의존성 보장

    data_1 = dsl.ContainerOp(
        name="validate data pipeline",
        image="byeongjokim/mnist-val-data:latest",
    ).after(data_0)

     

     

    (4) 임베딩 모델 학습

    - 임베딩 모델 학습 수행

    - 학습 결과는 train-model-pvc의 /model에 저장

    train_model = dsl.ContainerOp(
        name="train embedding model",
        image="byeongjokim/mnist-train-model:latest",
    ).after(data_1)

     

     

    (5) 임베딩 생성

    - 학습된 임베딩 모델을 활용하여 벡터 표현 생성

    embedding = dsl.ContainerOp(
        name="embedding data using embedding model",
        image="byeongjokim/mnist-embedding:latest",
    ).after(train_model)

     

     

    (6) FAISS 학습

    - 생성된 임베딩을 기반으로 FAISS 인덱스 학습

    - 이후 검색 및 분석 단계에서 사용

    train_faiss = dsl.ContainerOp(
        name="train faiss",
        image="byeongjokim/mnist-train-faiss:latest",
    ).after(embedding)

     

     

    (7) 분석 단계

    - 모델 성능 분석 및 평가

    • 산출물
      • 혼동행렬 : /confusion_matrix.csv
      • 정확도 값 : /accuracy.json
      • UI 메타데이터 : /mlpipeline-ui-metadata.json
      • 대시보드 메트릭 : /mlpipeline-metrics.json
    analysis = dsl.ContainerOp(
        name="analysis total",
        image="byeongjokim/mnist-analysis:latest",
        file_outputs={
            "confusion_matrix": "/confusion_matrix.csv",
            "mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json",
            "accuracy": "/accuracy.json",
            "mlpipeline_metrics": "/mlpipeline-metrics.json"
        }
    ).after(train_faiss)

     

     

    (8) 조건부 배포

    - 정확도(accuracy) > 0.8일 경우에만 배포 수행

    - 학습 모델을 deploy-model-pvc에 복사하여 서빙 환경에서 사용 가능

    baseline = 0.8
    with dsl.Condition(analysis.outputs["accuracy"] > baseline):
        deploy = dsl.ContainerOp(
            name="deploy mar",
            image="byeongjokim/mnist-deploy:latest",
        ).after(analysis)

     

     

    Upload Pipelines

    - 완성된 함수(mnist_pipeline)를 compile 한 후, 파이프라인에 올리고, run을 

    • kfp.Clinet() : kubeflow pipelines API 접근
    • kfp.compiler.Compiler().compile() : 해당 파이프라인 함수 compile
    • client.get_pipeline_id() : 현재 파이프라인 이름을 가진 파이프라인이 존재하는지 확인
    • client.upload_pipeline_version(), client.upload_pipeline() : 파이프라인 신규/버전 업로드
    • client.create_experiment() : experiment 생성 및 기존 experiment 접근
    • client.run_pipeline() : 파이프라인 run

     

     

     

     

     

     

     

    3. 파이프라인 구축

    01. 간단한 실습

    kfp SDK 설치 및 호출
    1. kfp SDK를 설치함
      - 터미널에 다음과 같이 입력함
      pip install kfp


    2. 모듈을 호출함
      - 설치된 kfp SDK에서 파이프라인 컴포넌트와 파이프라인을 정의하는 데 사용되는 dsl 모듈과 파이프라인을 컴파일하는 compiler 모듈을 불러옴
      from kfp import dsl
      from kfp import compiler

     

    파이프라인 구성요소 구축

    - 인사말을 출력하는 간단한 파이프라인을 구축함

    1. 컴포넌트 정의
      - 컴포넌트가 실행될 컨테이너 이미지 지정
      @dsl.component(base_image="python:3.9")
      def say_hello(name: str) -> str:
          hello_text = f'Hello, {name}!'
          print(hello_text)
          return hello_text
      • @dsl.component(base_image="python:3.9)
        - @dsl.comonent는 이 함수가 파이프라인의 컴포넌트임을 kubeflow에게 알려주는 데코레이터
        - base_image="python:3.9"는 이 컴포넌트가 실행될 컨테이너 이미지를 지정함. 모든 파이프라인 컴포넌트는 격리된 환경에서 실행되어야 하므로, Python 3.9가 설치된 기본 이미지를 사용하도록 설정함
    2. 파이프라인 정의
      - 컴포넌트들을 연결하여 전체 작업의 흐름을 설계하는 과정
      @dsl.pipeline
      def hello_pipeline(recipient: str) -> str:
          hello_task = say_hello(name=recipient)
          return hello_task.output
      • @dsl.pipeline
        - 이 함수가 파이프라인임을 나타내는 데코레이터
    3. 파이프라인 컴파일
      - 정의한 파이프라인을 YAML 파일로 컴파일하여 Kubeflow Pipelines에 업로드할 수 있는 형태로 저장
      compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

     

    파이프라인 실행

    - 이 파이프라인을 'Kubeflow Pipelines' 서비스에 업로드함. 이때 두가지 방법이 있음

    1. pipeline을 yaml 파일로 만들어 Kubeflow UI에 업로드하는 방식
    2. 다른 하나는 Kubeflow Pipelines SDK 클라이언트를 사용해 직접 서버와 통신해 파이프라인을 실행하는 방식

    - 둘 중 1번 방법으로 진행함

    - 쿠버플로우 Pipelines의 'Upload pipeline' 클릭하여 진행함

    1. 이름 및 파일 설정
      - 아래의 사진과 같이 이름과 파일을 설정하고 'Create' 버튼을 클릭하여 생성함


    2. 생성 확인
      - 이렇게 생성이 된걸 확인할 수 있음


    3. run 생성
      - 오른쪽 상단의 'Create run' 버튼 클릭하여 생성함
      - Experiment가 없다면 그것부터 생성한 후에 선택함


    4. 'Run parameters' 에 아무거나 입력
      - Kubeflow Pipeline Test를 입력하여 테스트 해볼 예정


    5. 'Start' 클릭 후 결과 확인!
      - 체크 표시를 확인한 후 클릭해보면 Output을 확인할 수 있음!

     

     

    02. 학습 프로젝트 실행

    - 케라스 데이터셋으로부터 로이터 뉴스 기사 데이터를 로드하고, 학습 및 평가를 수행하는 파이프라인을 구축함

     

     

    데이터 로드 및 전처리

    1. 데코레이터 : @dsl.component( . . . )

    @dsl.component(
        base_image="tensorflow/tensorflow:2.15.0",
        packages_to_install=["matplotlib","joblib"]
    )
    • base_image
      - 해당 컴포넌트가 실행될 기본 도커 이미지
    • packages_to_install
      - 추가로 설치할 Python 패키지 ( matplotlib, joblib )

     

    2. 함수 정의

    def preprocess_data(
        reuters_data_path : dsl.OutputPath("Dataset"),
        num_words: int = 1000
    ) -> NamedTuple("Outputs",[("num_classes", int)]):
    • reuters_data_path
      - 전처리된 데이터를 저장할 출력 파일 경로 (Output Artifact)
      - 파이프라인에서 다음 단계 컴포넌트가 이 파일을 입력으로 받을 수 있음

    3. 데이터 로드

    (x_train, y_train), (x_test, y_test) = keras.datasets.reuters.load_data(num_words=num_words)

    - Keras에서 제공하는 Reuters 뉴스 데이터셋을 불러옴

    - 뉴스 기사(텍스트)는 숫자 인덱스로 변환되어 있으며, num_words보다 작은 단어 인덱스만 사용함

     

    4. 시퀀스 벡터화

    def vectorize_sequences(sequences, dimension=num_words):
        results = np.zeros((len(sequences), dimension))
        for i, seq in enumerate(sequences):
            results[i, seq] = 1.0
        return results

    - 문장을 단어 인덱스 리스트 → 원-핫 인코딩 벡터로 변환

    - dimension : num_words차원 벡터

     

    5. 학습/테스트 데이터 변환

    x_train = vectorize_sequences(x_train)
    x_test = vectorize_sequences(x_test)
    
    y_train = keras.utils.to_categorical(y_train)
    y_test = keras.utils.to_categorical(y_test)

    - 입력 데이터(x) : 원-핫 벡터화

    - 출력 데이터(y) : 각 뉴스 카테고리를 원-핫 라벨로 변환

     

    6. 결과 저장 및 반환

    reuters_data = {"x_train": x_train, "y_train": y_train, "x_test": x_test, "y_test": y_test }
    joblib.dump(reuters_data, reuters_data_path)
    
    return (num_classes,)
    • joblib.dump()
      - 데이터를 파일로 저장함
      - 다음 단계에서 입력으로 사용됨
    • return (num_classes,)
      - 뉴스 카테고리의 개수를 반환
      - 이 값은 MLMD(Metadata store)에 기록되어 파이프라인의 다른 컴포넌트에서 메타데이터로 참조할 수 있음

     

    모델 학습

    1. 데코레이터 : @dsl.component( . . . )

    @dsl.component(
        base_image="tensorflow/tensorflow:2.15.0",
        packages_to_install=["joblib"]
    )
    • @dsl.component
      - Kubeflow Pipeline에서 사용할 "단일 컴포넌트"를 정의하는 데 사용됨
    • base_image
      - 이 컴포넌트가 실행될 기본 Docker 이미지
    • packages_to_install
      - 추가로 설치할 Python 패키지 목록
      - joblib를 설치해서 데이터를 로드할 수 있게 함

     

    2. 함수 정의

    def train_model(
        reuters_data_path: dsl.InputPath("Dataset"),
        num_words: int,
        num_classes: int,
        model_out: dsl.OutputPath("Model"),
        history_out: dsl.OutputPath("Metrics"),
        epochs: int = 10,
        batch_size: int = 512
    ) -> NamedTuple("Outputs", [("model_path", str)]):
    • dsl.InputPath()
      - 파이프라인 상에서 이전 단계의 출력(Dataset)을 "파일 경로" 형태로 받음
    • dsl.OutputPath("Model") / ("Metrics")
      - 이 컴포넌트가 생성할 결과물을 저장할 경로
      - Kubeflow가 자동으로 이 파일들을 아티팩트로 관리함
    • NamedTuple
      - 반환값 형식을 지정함

     

    3. 데이터 로드

    reuters_data = joblib.load(reuters_data_path)

    - 데이터 전처리 과정에서 딕셔너리로 저장해뒀던 reuters_data를 로드함

     

    4. 모델 정의

    model = keras.Sequential([
        layers.Dense(64, activation="relu", input_shape=(num_words,)),
        layers.Dense(64, activation="relu"),
        layers.Dense(num_classes, activation="softmax")
    ])

    - 입력 차원 : num_words

    - 은닉층 2개 (64개의 노드, ReLU)

    - 출력층: 클래스 개수(num_classes), softmax로 다중 분류

     

    5. 컴파일

    model.compile(
        optimizer="rmsprop",
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    - 옵티마이저 : RMSProp

    - 손실함수 : categorical_crossentropy

    - 평가 지표 : accuracy

     

     

    6. 학습

    history = model.fit(
        reuters_data["x_train"], reuters_data["y_train"],
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.2,
        verbose=2
    )

    - 학습 데이터에서 20%를 검증용으로 사용함

    - 학습 로그는 history 객체에 저장됨

     

     

    7. 결과 저장 및 반환

    model.save(model_out)
    
    with open(history_out, "w") as f:
        json.dump(history.history, f)
    
    return (model_out,)
    • model.save(model_out)
      - 학습된 모델을 Kubeflow의 아티팩트 경로(model_out)에 저장
      - 이 경로는 다음 컴포넌트가 dsl.InputPath("Model")로 받을 수 있음

    • with open(history_out, "w") as f :
      - 학습 결과(손실, 정확도 등)를 JSON 파일로 저장
      - Kubeflow UI에서 Metrics로 시각화 가능

    • return (model_out,)
      - NamedTuple 형식으로 model_path 문자열을 반환
      - Kubeflow 파이프라인 상에서 "이 컴포넌트의 출력"으로 인식됨

     

    모델 평가

    1. 컴포넌트 정의

    @dsl.component(
        base_image="tensorflow/tensorflow:2.15.0",
        packages_to_install=["joblib"]
    )

     

    • @dsl.component
      - Kubeflow Pipeline에서 하나의 컴포넌트를 정의하는 부분

    • base_image
      - 실행 환경으로 사용할 Docker 이미지

    • packages_to_install
      - 추가로 필요한 패키지를 지정함
      - joblib을 설치하여 .joblib 파일로 저장된 데이터를 읽을 수 있게 함

     

    2. 함수 정의

    def evaluate_model(
        model: dsl.InputPath("Model"),
        reuters_data_path: dsl.InputPath("Dataset")
    ) -> float:

     

    • model
      - train_model 단계에서 저장한 모델 파일 경로
      - dsl.InputPath("Model") → Kubeflow가 모델 파일을 이 경로에 자동으로 복사

    • reuters_data_path
      평가에 사용할 데이터셋 파일 경로 (joblib 형식)

    • 반환값(float)
      - 평가 정확도(accuracy)를 float 값으로 반환
      - Kubeflow의 ML Metadata(MLMD)에 자동 기록됨

     

    3. 데이터 및 모델 불러오기

    reuters_data = joblib.load(reuters_data_path)
    model = keras.models.load_model(model)

    - 전처리 단계에서 저장했던 Reuters 데이터셋을 로드함

    - 모델은 학습 때 저장된 TensorFlow Keras 모델을 불러옴

     

    4. 모델 평가

    loss, acc = model.evaluate(reuters_data["x_test"], reuters_data["y_test"], verbose=0)
    print(f"테스트 정확도: {acc:.4f}")

     

    • model.evaluate()
      - 주어진 테스트셋으로 모델 성능을 평가함
      • loss : 손실값 (crossentropy 등)
      • acc : 정확도 (accuracy)
    • verbose=0
      - 학습 로그를 생략하고 결과만 반환.

    - print()를 통해 Kubeflow 로그창에 "테스트 정확도: 0.XXXX" 형태로 출력

     

     

    5. 결과 반환

    return float(acc)

    - Kubeflow에서는 컴포넌트가 반환하는 단일 숫자(float, int, str 등)를 파이프라인 결과 파라미터로 저장함

     

    결과 시각화

    1. 컴포넌트 정의

    @dsl.component(
        base_image="python:3.9",
        packages_to_install=["matplotlib"]
    )

     

     

    • @dsl.component()
      - Kubeflow Pipelines에서 하나의 컴포넌트를 정의함

    • base_image="python:3.9"
      - Python 3.9 환경을 사용함

    • packages_to_install=["matplotlib"]
      - matplotlib 패키지를 설치해 그래프를 그림

     

    2. 함수 정의

    def plot_history(
        history_path: dsl.InputPath("Metrics"),
        plot_out: dsl.OutputPath("Plot"),
        metrics_out: dsl.OutputPath("Metrics")
    ):

     

    • history_path
      - 이전 단계에서 전달된 history JSON 파일의 경로
    • plot_out
      - 그래프를 저장할 출력 이미지 파일 경로
    • metrics_out
      - 최종 성능 지표를 저장할 출력 JSON 경로

    이 세 인자는 Kubeflow에서 artifact(산출물) 로 관리됨

     

    3. 학습 기록 불러오기

    with open(history_path, "r") as f:
        history = json.load(f)

     

    - 이전 단계(train_model 등)에서 저장한 history.json 파일을 불러옴

     

    4. 그래프 시각화

    plt.figure(figsize=(12, 5))
    • Loss 그래프
      - 훈련 손실(Loss)과 검증 손실(Val_loss)을 한 그래프에 표시함
      plt.subplot(1, 2, 1)
      plt.plot(epochs, loss, "bo-", label="Training loss")
      plt.plot(epochs, val_loss, "ro-", label="Validation loss")


    • Accuracy 그래프
      - 훈련 정확도와 검증 정확도를 표시함
      plt.subplot(1, 2, 2)
      plt.plot(epochs, acc, "bo-", label="Training acc")
      plt.plot(epochs, val_acc, "ro-", label="Validation acc")



     

     

     

     

     

    https://byeongjo-kim.tistory.com/28