InfoGrab DocsInfoGrab Docs

GitLab EventStore

요약

GitLab 모노리스 프로젝트는 점점 커지고 있으며, 더 많은 도메인이 정의되고 있습니다. 대표적인 예로 PostReceive 워커가 있습니다. 단일 책임 원칙(Single Responsibility Principle)을 위반합니다.

배경#

GitLab 모노리스 프로젝트는 점점 커지고 있으며, 더 많은 도메인이 정의되고 있습니다. 그 결과, 이러한 도메인들은 시간적 결합(temporal coupling)으로 인해 서로 얽히게 됩니다.

대표적인 예로 PostReceive 워커가 있습니다. 이 워커에서는 여러 도메인에 걸쳐 많은 작업이 이루어집니다. 새 커밋 푸시에 반응하는 새로운 동작이 필요하면, PostReceive 또는 하위 컴포넌트(예: Git::ProcessRefChangesService)에 코드를 추가하게 됩니다.

이러한 아키텍처는:

  • 단일 책임 원칙(Single Responsibility Principle)을 위반합니다.

  • 익숙하지 않은 코드베이스에 코드를 추가하는 위험을 증가시킵니다. 알지 못하는 미묘한 부분이 있을 수 있으며, 이로 인해 버그나 성능 저하가 발생할 수 있습니다.

  • 도메인 경계를 위반합니다. 특정 네임스페이스(예: Git::) 내부에서 다른 도메인의 클래스(Ci:: 또는 MergeRequests::)가 불쑥 등장합니다.

EventStore란?#

Gitlab::EventStore는 기존의 Sidekiq 워커와 현재의 옵저버빌리티(observability) 위에 구축된 기본적인 pub-sub 시스템입니다. 이 시스템을 사용하여 도메인을 모델링할 때 이벤트 기반 접근 방식을 적용하면서 결합도를 최소화합니다.

이 방식은 기존 Sidekiq 워커를 그대로 유지하면서 비동기 작업을 수행하지만, 의존성의 방향을 역전시킵니다.

EventStore 예시#

CI 파이프라인이 생성될 때, 파이프라인의 ref와 일치하는 모든 머지 리퀘스트의 헤드 파이프라인을 업데이트합니다. 그러면 머지 리퀘스트에서 최신 파이프라인의 상태를 표시할 수 있습니다.

EventStore 없이#

Ci::CreatePipelineService를 변경하고 파이프라인이 생성되었는지 확인하는 로직(예: if 문)을 추가합니다. 그런 다음, MergeRequests:: 도메인의 사이드 이펙트를 실행할 워커를 예약합니다.

이 방식은 개방-폐쇄 원칙(Open-Closed Principle)을 위반하며 다른 도메인의 사이드 이펙트 로직을 불필요하게 추가하여 결합도를 높입니다:

graph LR subgraph ci[CI] cp[CreatePipelineService] end

subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end

subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end

cp -- perform_async --> upw cp -- perform_async --> pow

EventStore 사용 시#

Ci::CreatePipelineServiceCi::PipelineCreatedEvent 이벤트를 발행하고, 역할이 여기서 끝납니다.

MergeRequests:: 도메인은 워커 MergeRequests::UpdateHeadPipelineWorker로 이 이벤트를 구독할 수 있으므로:

  • 사이드 이펙트는 비동기적으로 예약되어, 도메인 이벤트를 발행하는 주 비즈니스 트랜잭션에 영향을 주지 않습니다.

  • 주 비즈니스 트랜잭션을 수정하지 않고도 더 많은 사이드 이펙트를 추가할 수 있습니다.

  • 어떤 도메인이 관여하고 소유권이 어디에 있는지 명확하게 볼 수 있습니다.

  • 시스템에서 어떤 이벤트가 발생하는지 명시적으로 선언되어 있어 식별할 수 있습니다.

Gitlab::EventStore를 사용하더라도 구독자(Sidekiq 워커)와 도메인 이벤트의 스키마 사이에는 여전히 결합이 존재합니다. 이 수준의 결합은 주 트랜잭션(Ci::CreatePipelineService)이 다음에 결합되는 것보다 훨씬 작습니다:

  • 다수의 구독자.

  • 다양한 구독자 호출 방식(조건부 호출 포함).

  • 다양한 파라미터 전달 방식.

graph LR subgraph ci[CI] cp[CreatePipelineService] cp -- publish --> e[PipelineCreateEvent] end

subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end

subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end

upw -. subscribe .-> e pow -. subscribe .-> e

각 구독자는 그 자체로 Sidekiq 워커이므로, 담당하는 작업 유형과 관련된 속성을 지정할 수 있습니다. 예를 들어, 한 구독자는 urgency: high를 정의하고, 덜 중요한 다른 구독자는 urgency: low를 설정할 수 있습니다.

EventStore는 의존성 역전(Dependency Inversion)을 가능하게 해주는 추상화입니다. 이를 통해 비즈니스 트랜잭션과 사이드 이펙트(종종 다른 도메인에서 실행되는)를 분리하는 데 도움이 됩니다.

이벤트가 발행되면, EventStore는 구독된 각 워커에서 perform_async를 호출하여 이벤트 정보를 인수로 전달합니다. 이는 기본적으로 각 구독자의 큐에 Sidekiq job을 예약합니다.

즉, 구독자는 단순히 Sidekiq 워커이므로 구독자가 작동하는 방식에는 다른 변경이 없습니다. 예를 들어, 워커(구독자)가 job 실행에 실패하면 해당 job은 재시도를 위해 Sidekiq으로 되돌아갑니다.

EventStore 장점#

  • 구독자(Sidekiq 워커)는 사이드 이펙트가 중요한 경우 워커 가중치를 변경하여 더 빠르게 실행되도록 설정할 수 있습니다.

  • 사이드 이펙트가 비동기적으로 실행된다는 사실을 자동으로 보장합니다. 이를 통해 다른 도메인이 주 비즈니스 트랜잭션의 성능에 영향을 주지 않고 이벤트를 구독할 수 있습니다.

EventStore 단점#

  • EventStore는 Sidekiq 위에 구축됩니다. Sidekiq 워커는 재시도 및 지수 백오프를 지원하지만, 워커가 재시도 한도를 초과하여 Sidekiq job이 손실되는 경우가 있습니다. 또한, 인시던트 및 재해 복구 과정에서 Sidekiq job이 삭제될 수 있습니다. 많은 중요한 GitLab 기능이 Sidekiq의 내구성을 전제로 하지만, 일부 중요한 데이터 무결성 기능에는 이것이 허용되지 않을 수 있습니다. 작업이 결국 완료되어야 함을 보장해야 한다면, Sidekiq cron 워커가 job을 처리하는 Postgres 큐잉 메커니즘을 구현해야 할 수 있습니다. 이 접근 방식의 예는 ::LooseForeignKeys::CleanupWorker::BatchedGitRefUpdates::ProjectCleanupWorker에서 확인할 수 있습니다. 일반적으로 파티셔닝된 테이블을 생성하고 데이터를 삽입하면, 나중에 cron 워커가 처리하고 작업 후 데이터베이스에서 processed로 표시합니다. ::Elastic::ProcessBookkeepingService에서 사용하는 것과 같이 Redis에서 신뢰할 수 있는 큐를 구현하는 전략도 있습니다. 코드베이스에 새로운 큐잉 패턴을 도입하려면 프로세스 초기에 메인테이너의 조언을 구해야 합니다.

  • 또는 로직이 주 비즈니스 트랜잭션의 일부로 처리되어야 하고 사이드 이펙트가 아닌 경우에는 EventStore 사용을 고려하지 마세요.

  • Sidekiq 워커는 기본적으로 제한이 없지만, 공유 리소스가 포화될 위험이 있는 경우 동시성 제한(concurrency limit) 구성을 고려해야 합니다.

이벤트 정의#

새 이벤트는 레거시 이벤트 대신 Cloud Event만 사용해야 합니다.

Event 객체는 bounded context에서 발생한 도메인 이벤트를 나타냅니다. 프로듀서는 발생한 일을 이벤트로 발행하여 다른 bounded context에 알리고, 이에 반응할 수 있게 합니다. 이벤트는 <domain_object><action>Event로 명명해야 하며, action은 과거 시제를 사용합니다. 예를 들어 AddReviewerEvent 대신 ReviewerAddedEvent입니다. domain_object는 bounded context를 기반으로 명확한 경우 생략할 수 있습니다. 예를 들어 MergeRequest::MergeRequestApprovedEvent 대신 MergeRequest::ApprovedEvent입니다.

좋은 이벤트를 위한 지침#

이벤트는 API나 UI처럼 공개 인터페이스입니다. 제품 및 디자인 담당자와 협력하여 새 이벤트가 구독자의 요구를 충족하는지 확인하세요. 가능한 경우 새 이벤트는 다음 원칙을 따르도록 노력해야 합니다:

  • 의미론적(Semantic): 이벤트는 구독자를 위한 의도된 액션이 아니라, bounded context 내에서 발생한 일을 설명해야 합니다.

  • 구체적(Specific): 이벤트는 지나치게 정밀하지 않으면서 좁게 정의되어야 합니다. 이렇게 하면 구독자가 수행해야 하는 이벤트 필터링의 양과 구독해야 하는 고유 이벤트 수를 최소화합니다. 추가 정보를 전달할 때는 프로퍼티를 사용하는 것을 고려하세요.

  • 범위 지정(Scoped): 이벤트는 해당 bounded context로 범위를 지정해야 합니다. 자신의 bounded context에 포함되지 않은 도메인 객체에 대한 이벤트 발행은 피하세요.

예시#

원칙 좋음 나쁨
의미론적(Semantic) MergeRequest::ApprovedEvent MergeRequest::NotifyAuthorEvent
구체적(Specific) MergeRequest::ReviewerAddedEvent • MergeRequest::ChangedEvent • MergeRequest::CodeownerAddedAsReviewerEvent
범위 지정(Scoped) MergeRequest::CreatedEvent Project::MergeRequestCreatedEvent

Cloud Event 구조#

GitLab Events Platform은 벤더 중립적인 이벤트 데이터 형식인 CloudEvents 명세 v1.0을 기반으로 하는 새로운 유형의 EventStore 이벤트를 도입합니다. 앞으로 EventStore의 모든 이벤트는 Cloud Events를 사용하여 이 새로운 명세를 따라야 합니다.

Cloud Event 페이로드는 다음 JSON 스키마를 따릅니다

{
  'type' => 'object',
  'properties' => {
    'specversion' => { 'type' => 'string' },
    'type' => { 'type' => 'string' },
    'source' => { 'type' => 'string' },
    'id' => { 'type' => 'string' },
    'gitlab_user_id' => { 'type' => 'number' },
    'gitlab_user_username' => { 'type' => 'string' },
    'gitlab_organization_id' => { 'type' => 'number' },
    'time' => { 'type' => 'string', 'format' => 'date-time' },
    'datacontenttype' => { 'type' => 'string' },
    'dataschema' => { 'type' => 'string', 'format' => 'uri' },
    'subject' => { 'type' => 'string' },
    'data' => { 'type' => 'object' } # 이벤트가 커스텀 페이로드를 주입하는 곳입니다
  },
  'required' => required_fields
}

페이로드는 두 부분으로 구성됩니다:

  • Cloud Events 명세를 따르는 외부 필수 페이로드. CloudEvent 클래스에 의해 생성됩니다.

  • data 속성 내의 내부 커스텀 페이로드. 커스텀 이벤트 자체는 data_schema를 통해 이 페이로드의 유효성 검사를 담당합니다. 다음 섹션의 예시를 참고하세요.

Cloud Event 정의#

Cloud Events는 app/events/<namespace>/ 아래에 정의되며 Gitlab::EventStore::CloudEvent를 상속합니다.

이벤트 카테고리와 유형은 클래스 수준 메서드를 사용하여 설정하고, data_schema 메서드와 build 팩토리 메서드를 구현합니다:

module MergeRequests
  class AssignedReviewersEvent < Gitlab::EventStore::CloudEvent
    event_category :merge_requests
    event_type :assigned_reviewers

    class << self
      def build(current_user:, merge_request:, new_reviewers:)
        build_cloud_event(
          source: "projects/#{merge_request.project.id}",
          subject: "merge_requests/#{merge_request.id}",
          current_user: current_user,
          organization: merge_request.project.organization,
          event_data: generate_event_data(merge_request, new_reviewers)
        )
      end

      private

      def generate_event_data(merge_request, new_reviewers)
        {
          merge_request_id: merge_request.id,
          merge_request_iid: merge_request.iid,
          project_id: merge_request.project_id,
          new_reviewers: new_reviewers.map { |r| { id: r.id, user_type: r.user_type } }
        }
      end
    end

    def data_schema
      {
        type: "object",
        required: %w[merge_request_id merge_request_iid project_id new_reviewers],
        properties: {
          merge_request_id: { type: "integer" },
          merge_request_iid: { type: "integer" },
          project_id: { type: "integer" },
          new_reviewers: {
            type: "array",
            items: {
              type: "object",
              required: %w[id user_type],
              properties: {
                id: { type: "integer" },
                user_type: { type: "string" }
              }
            }
          }
        },
        additionalProperties: false
      }
    end
  end
end

event_categoryevent_type 속성은 Cloud Event type 필드 (com.gitlab.<category>.<type>)와 dataschema URL (https://gitlab.com/schemas/<category>/<type>/v1.0)을 결정합니다. 일반적으로 event_category는 도메인에 매핑되고 event_type은 도메인 내의 특정 액션입니다.

data_schema 메서드는 Cloud Event 페이로드의 data 필드를 유효성 검사하는 유효한 JSON 스키마를 반환해야 합니다. 외부 엔벨로프 필드(specversion, type, source 등)는 CloudEvent 클래스에서 자동으로 유효성 검사됩니다.

이벤트 스키마 정의 (레거시)#

새 이벤트는 레거시 이벤트 대신 Cloud Event만 사용해야 합니다. 이 섹션은

레거시 이벤트 유지보수를 위해서만 참고하세요.

새 이벤트 클래스를 app/events/<namespace>/ 아래에 과거에 발생한 일을 나타내는 이름으로 정의합니다:

class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
  def schema
    {
      'type' => 'object',
      'required' => ['pipeline_id'],
      'properties' => {
        'pipeline_id' => { 'type' => 'integer' },
        'ref' => { 'type' => 'string' }
      }
    }
  end
end

유효한 JSON 스키마이어야 하는 스키마는 JSONSchemer 젬에 의해 유효성 검사됩니다. 유효성 검사는 이벤트 객체를 초기화할 때 즉시 수행되어 발행자가 구독자와의 계약을 따르도록 보장합니다.

스키마 변경에 필요한 롤아웃 횟수를 줄이기 위해 가능한 한 선택적 프로퍼티를 사용해야 합니다. 그러나 required 프로퍼티는 이벤트 주체의 고유 식별자에 사용할 수 있습니다. 예를 들어:

  • pipeline_idCi::PipelineCreatedEvent의 required 프로퍼티가 될 수 있습니다.

  • project_idProjects::ProjectDeletedEvent의 required 프로퍼티가 될 수 있습니다.

특정 구독자에 맞게 페이로드를 조정하지 않고 구독자에게 필요한 프로퍼티만 발행하세요. 페이로드는 이벤트를 완전히 나타내야 하며 느슨하게 관련된 프로퍼티를 포함해서는 안 됩니다. 예를 들어:

event = Ci::PipelineCreatedEvent.new(data: {
  pipeline_id: pipeline.id,
  # 모든 구독자가 머지 리퀘스트 ID를 필요로 하는 것이 아니라면,
  # 이 데이터는 구독자가 직접 가져올 수 있습니다.
  merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})

더 많은 프로퍼티로 이벤트를 발행하면 구독자가 처음부터 필요한 데이터를 갖게 됩니다. 그렇지 않으면 구독자가 데이터베이스에서 추가 데이터를 가져와야 합니다. 그러나 이로 인해 스키마가 지속적으로 변경되고 단일 진실 공급원(Single Source Of Truth, SSOT)을 나타내지 않을 수 있는 프로퍼티가 추가될 수 있습니다. 이 기법은 성능 최적화로 사용하는 것이 가장 좋습니다. 예를 들어, 이벤트에 많은 구독자가 있고 모두 데이터베이스에서 동일한 데이터를 다시 가져오는 경우입니다.

이벤트 업데이트#

스키마 또는 이벤트 이름을 변경하려면 여러 번의 롤아웃이 필요합니다. 새 버전이 배포되는 동안:

  • 기존 발행자는 이전 버전을 사용하여 이벤트를 발행할 수 있습니다.

  • 기존 구독자는 이전 버전을 사용하여 이벤트를 소비할 수 있습니다.

  • 이벤트는 Sidekiq 큐에 job 인수로 지속되므로, 배포 중에 스키마의 두 버전이 공존할 수 있습니다.

스키마 변경은 궁극적으로 Sidekiq 인수에 영향을 미치므로, 여러 롤아웃과 관련하여 Sidekiq 스타일 가이드를 참고하세요.

이벤트 이름 변경#

  • 롤아웃 1: 새 이벤트를 도입하고 구독자를 준비합니다.

새 이름으로 이벤트의 복사본을 도입합니다(이전 이벤트가 새 이벤트를 상속하도록 할 수 있습니다).

  • 구독자 워커가 이벤트 이름을 알고 있다면, 새 이벤트도 처리할 수 있도록 합니다.

  • 롤아웃 2: 새 이벤트를 구독자로 라우팅합니다.

발행자를 새 이벤트를 사용하도록 변경합니다.

  • 이전 이벤트를 사용하던 모든 구독을 새 이벤트를 사용하도록 변경합니다.

  • 이전 이벤트 클래스를 제거합니다.

프로퍼티 추가#

  • 롤아웃 1:

새 프로퍼티를 선택적(required 아님)으로 추가합니다.

  • 구독자를 업데이트하여 새 프로퍼티가 있거나 없는 이벤트를 모두 소비할 수 있도록 합니다.

  • 롤아웃 2:

발행자를 변경하여 새 프로퍼티를 제공합니다.

  • 롤아웃 3: (프로퍼티가 required이어야 하는 경우):

스키마와 구독자 코드를 변경하여 항상 해당 프로퍼티를 기대하도록 합니다.

프로퍼티 제거#

  • 롤아웃 1:

프로퍼티가 required이면 선택적으로 만듭니다.

  • 구독자를 업데이트하여 항상 해당 프로퍼티를 기대하지 않도록 합니다.

  • 롤아웃 2:

이벤트 발행에서 프로퍼티를 제거합니다.

  • 해당 프로퍼티를 처리하는 코드를 구독자에서 제거합니다.

기타 변경사항#

프로퍼티 이름 변경과 같은 기타 변경사항의 경우 동일한 단계를 사용합니다:

  • 이전 프로퍼티 제거

  • 새 프로퍼티 추가

CloudEvent 발행#

이벤트 발행은 Cloud Events와 레거시 EventStore 이벤트가 동일합니다

Gitlab::EventStore.publish(event)

가능하면 이벤트는 관련 Service 클래스에서 디스패치되어야 합니다. 상태 머신 전환과 같이 모델이 이벤트를 발행하도록 허용하는 예외도 있습니다. 예를 들어, 사이드 이펙트 컬렉션을 실행하는 Ci::BuildFinishedWorker를 예약하는 대신 Ci::BuildFinishedEvent를 발행하고 다른 도메인이 비동기적으로 반응하도록 할 수 있습니다.

ActiveRecord 콜백은 도메인 이벤트를 나타내기에는 너무 저수준입니다. 이는 더 많은 데이터베이스 레코드 변경을 나타냅니다. 의미가 있는 경우도 있지만, 그런 예외를 신중하게 고려해야 합니다.

Cloud Event 구독자 생성#

Cloud Event 구독은 레거시 이벤트와 동일한 패턴을 따릅니다. 단, 커스텀 이벤트 데이터에 접근할 때는 event_data 속성을 통해 접근합니다.

module MergeRequests
  class ProcessAssignedReviewersWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      # 엔벨로프 필드 접근
      user = event.current_user
      organization = event.organization

      # 이벤트별 페이로드 접근
      merge_request_id = event.event_data[:merge_request_id]
      new_reviewers = event.event_data[:new_reviewers]

      # ...
    end
  end
end

레거시 이벤트 구독자 생성#

구독자는 Gitlab::EventStore::Subscriber 모듈을 포함하는 Sidekiq 워커입니다. 이 모듈은 perform 메서드를 처리하고 handle_event 메서드를 통해 이벤트를 안전하게 처리하는 더 나은 추상화를 제공합니다. 예를 들어:

module MergeRequests
  class UpdateHeadPipelineWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
        # ...
      end
    end
  end
end

이벤트에 구독자 등록#

이는 Cloud Events와 레거시 이벤트 모두에 적용됩니다.

워커를 특정 이벤트에 구독하려면 구독 클래스를 생성하고 register 메서드 내에 이벤트를 등록합니다:

구독을 등록할 때 [카나리 배포와의 호환성 보장](/19.1/development/sidekiq/compatibility_across_updates/#adding-new-workers)을 위해

Sidekiq 워커는 이전 배포에서 도입되어야 하거나 피처 플래그를 사용해야 합니다.

module Gitlab
  module EventStore
    module Subscriptions
      class SbomSubscriptions < BaseSubscriptions
        def register
          store.subscribe ::Sbom::ProcessTransferEventsWorker, to: ::Projects::ProjectTransferedEvent,
            if: ->(event) do
              actor = ::Project.actor_from_id(event.data[:project_id])
              Feature.enabled?(:sync_project_archival_status_to_sbom_occurrences, actor)
            end
        end
      end
    end
  end
end

bounded context마다 별도의 구독 클래스가 있어야 합니다. 구독은 이벤트가 아닌 워커의 bounded context를 사용하여 구성됩니다.

  • FOSS 워커는 lib/gitlab/bounded_contexts/subscriptions/[context]_subscriptions.rb에 구독을 생성해야 합니다.

  • 도메인이 FOSS와 EE 모두에 존재하는 경우, EE 코드는 ee/lib/ee/gitlab/event_store/subscriptions/[context]_subscriptions.rb에 배치해야 합니다.

  • EE 전용 도메인은 ee/lib/gitlab/event_store/subscriptions/[context]_subscriptions.rb에 배치해야 합니다.

새로 생성된 구독 그룹은 lib/gitlab/event_store.rbSUBSCRIPTION_GROUPS 상수 또는 ee/lib/ee/gitlab/event_store.rbEE_SUBSCRIPTION_GROUPS에 추가해야 합니다.

워커가 구독되었는지 확인하는 테스트는 구독자 테스트에 따라 워커의 spec 파일에 배치해야 합니다.

구독은 Rails 앱이 로드될 때 메모리에 저장되며 즉시 동결됩니다. 런타임에 구독을 수정하는 것은 불가능합니다.

이벤트의 조건부 디스패치#

구독은 이벤트를 수락할 조건을 지정할 수 있습니다:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  if: -> (event) { event.data[:merge_request_id].present? }

이는 조건이 충족될 때 이벤트 스토어가 Ci::PipelineCreatedEvent를 구독자에게 디스패치하도록 합니다.

이 기법은 구독자가 이벤트의 소규모 서브셋에만 관심이 있는 경우 Sidekiq job 예약을 피할 수 있습니다.

조건부 디스패치를 사용할 때는 주어진 이벤트가 발행될 때마다 동기적으로 실행되므로

저렴한 조건만 포함해야 합니다.

복잡한 조건의 경우 모든 이벤트를 구독하고 구독자 워커의 handle_event 메서드에서 로직을 처리하는 것이 좋습니다.

이벤트의 지연 디스패치#

구독은 이벤트를 수신할 지연 시간을 지정할 수 있습니다:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  delay: 1.minute

delay 파라미터는 구독자 Sidekiq 워커에서 perform_async 대신 perform_in 메서드를 사용하도록 이벤트 디스패치를 전환합니다.

이 기법은 많은 이벤트를 발행하고 Sidekiq 중복 제거를 활용할 때 유용합니다.

이벤트 그룹 발행#

일부 시나리오에서는 단일 비즈니스 트랜잭션에서 동일한 유형의 여러 이벤트를 발행합니다. 이는 각 이벤트마다 job을 호출하여 Sidekiq에 추가 부하를 줍니다. 이런 경우 Gitlab::EventStore.publish_group을 호출하여 이벤트 그룹을 발행할 수 있습니다. 이 메서드는 동일한 유형의 이벤트 배열을 받습니다. 기본적으로 구독자 워커는 최대 10개의 이벤트 그룹을 받지만, 구독을 생성할 때 group_size 파라미터를 정의하여 구성할 수 있습니다. 발행된 이벤트 수는 구성된 group_size에 따라 배치로 구독자에게 디스패치됩니다. 그룹 수가 100을 초과하면, Sidekiq의 부하를 줄이기 위해 각 그룹을 10초 지연으로 예약합니다.

store.subscribe ::Security::RefreshProjectPoliciesWorker,
  to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
  delay: 1.minute,
  group_size: 25

구독자 워커의 handle_event 메서드는 그룹의 각 이벤트에 대해 호출됩니다.

구독자 제거#

Gitlab::EventStore는 Sidekiq을 기반으로 하므로, 다음으로 시작하는 Sidekiq 워커 제거 가이드를 따릅니다:

  • job을 큐에 넣는 코드를 제거하기 위해 구독을 먼저 제거합니다.

  • 구독자 워커를 no-op으로 만듭니다. 이를 위해 워커에서 Gitlab::EventStore::Subscriber 모듈을 제거해야 합니다.

테스트#

Cloud Event 클래스 테스트#

data_schema를 유효성 검사하려면 'a cloud event with schema' 공유 예시를 사용하세요:

RSpec.describe MergeRequests::AssignedReviewersEvent, feature_category: :code_review_workflow do
  it_behaves_like 'a cloud event with schema',
    valid_data: {
      merge_request_id: 1,
      merge_request_iid: 10,
      project_id: 100,
      new_reviewers: [{ id: 1, user_type: "human" }]
    },
    missing_required: %i[merge_request_id merge_request_iid project_id new_reviewers],
    invalid_types: {
      merge_request_id: 'not_an_integer',
      new_reviewers: 'not_an_array'
    }
end

레거시 Event 클래스 테스트#

schema를 유효성 검사하려면 'an event with schema' 공유 예시를 사용하세요:

RSpec.describe MergeRequests::ApprovedEvent, feature_category: :code_review_workflow do
  it_behaves_like 'an event with schema',
    valid_data: { current_user_id: 1, merge_request_id: 2 },
    missing_required: %i[current_user_id merge_request_id],
    invalid_types: {
      current_user_id: 'not_an_integer',
      merge_request_id: 'not_an_integer',
      approved_at: 'not-a-date'
    }

  describe '#schema' do
    context 'with valid optional approved_at' do
      it 'accepts a date-time string' do
        data = { current_user_id: 1, merge_request_id: 2, approved_at: '2024-01-10T12:00:00Z' }

        expect { described_class.new(data: data) }.not_to raise_error
      end
    end
  end
end

발행자 테스트#

발행자의 역할은 이벤트가 올바르게 발행되었는지 확인하는 것입니다.

이벤트가 올바르게 발행되었는지 테스트하려면 RSpec 매처 :publish_event를 사용할 수 있습니다:

it 'publishes a ProjectDeleted event with project id and namespace id' do
  expected_data = { project_id: project.id, namespace_id: project.namespace_id }

  # 매처는 블록이 호출될 때 블록이 예상된 이벤트와 데이터를 발행하는지 확인합니다.
  expect { destroy_project(project, user, {}) }
    .to publish_event(Projects::ProjectDeletedEvent)
    .with(expected_data)
end

:publish_event 매처 내에서 매처를 조합하는 것도 가능합니다. 이는 특정 종류의 값으로 이벤트가 생성되었는지 확인하고 싶지만 값을 미리 알 수 없는 경우에 유용합니다. 이러한 예는 새 레코드를 생성한 후 이벤트를 발행할 때입니다.

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # 프로젝트 ID는 expect 블록에서 `create_project`가
  # 호출될 때만 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
end

여러 이벤트를 발행할 때 발행되지 않은 이벤트도 확인할 수 있습니다.

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # 프로젝트 ID는 `expect` 블록에서 `create_project`가
  # 호출될 때 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
    .and not_publish_event(Projects::ProjectDeletedEvent)
end

구독자 테스트#

구독자는 발행된 이벤트가 올바르게 소비될 수 있는지 확인해야 합니다. 이를 위해 구독자를 테스트하는 방식을 표준화하는 헬퍼와 공유 예시를 추가했습니다:

RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
  let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }

  # 이 공유 예시는 이벤트가 발행되고 현재 구독자(`described_class`)에 의해
  # 올바르게 처리되는지 확인합니다. 또한 워커가 멱등성을 가지는지 확인합니다.
  it_behaves_like 'subscribes to event' do
    let(:event) { pipeline_created_event }
  end

  # 이 공유 예시는 발행된 이벤트가 무시되는지 확인합니다. 이는
  # 조건부 디스패치 테스트에 유용할 수 있습니다.
  it_behaves_like 'ignores the published event' do
    let(:event) { pipeline_created_event }
  end

  it 'does something' do
    # 이 헬퍼는 `handle_event`가 올바르게 호출되는지 확인하면서 `perform`을 직접 실행합니다.
    consume_event(subscriber: described_class, event: pipeline_created_event)

    # 기대값 실행
  end
end

모범 사례#

이벤트가 항상 발생하는 코드(CE 또는 EE)와 동일한 위치에 이벤트 클래스를 정의하고 이벤트를 발행합니다.

이벤트가 CE 기능의 결과로 발생하는 경우, 이벤트 클래스는 CE에서 정의되고 발행되어야 합니다. 마찬가지로 이벤트가 EE 기능의 결과로 발생하는 경우, 이벤트 클래스는 EE에서 정의되고 발행되어야 합니다.

  • 이벤트에 의존하는 구독자를 종속 기능이 존재하는 동일한 코드(CE 또는 EE)에 정의합니다.

CE에서 발행된 이벤트(예: Projects::ProjectCreatedEvent)와 EE에서 정의된 이 이벤트에 의존하는 구독자(예: Security::SyncSecurityPolicyWorker)가 있을 수 있습니다.

  • 이벤트 클래스를 정의하고 동일한 bounded context(최상위 Ruby 네임스페이스) 내에서 이벤트를 발행합니다.

주어진 bounded context는 자체 컨텍스트와 관련된 이벤트만 발행해야 합니다.

  • 이벤트를 구독할 때 신호 대 잡음 비율을 평가합니다. 구독자 내에서 처리하는 이벤트 대비 무시하는 이벤트는 얼마나 되나요? 이벤트의 소규모 서브셋에만 관심이 있는 경우 조건부 디스패치 사용을 고려하세요. 조건부 디스패치로 동기 검사를 실행하는 것과 잠재적으로 중복된 워커를 예약하는 것 사이에서 균형을 잡으세요.

GitLab EventStore

GitLab v19.1
원문 보기
요약

GitLab 모노리스 프로젝트는 점점 커지고 있으며, 더 많은 도메인이 정의되고 있습니다. 대표적인 예로 PostReceive 워커가 있습니다. 단일 책임 원칙(Single Responsibility Principle)을 위반합니다.

배경#

GitLab 모노리스 프로젝트는 점점 커지고 있으며, 더 많은 도메인이 정의되고 있습니다. 그 결과, 이러한 도메인들은 시간적 결합(temporal coupling)으로 인해 서로 얽히게 됩니다.

대표적인 예로 PostReceive 워커가 있습니다. 이 워커에서는 여러 도메인에 걸쳐 많은 작업이 이루어집니다. 새 커밋 푸시에 반응하는 새로운 동작이 필요하면, PostReceive 또는 하위 컴포넌트(예: Git::ProcessRefChangesService)에 코드를 추가하게 됩니다.

이러한 아키텍처는:

  • 단일 책임 원칙(Single Responsibility Principle)을 위반합니다.

  • 익숙하지 않은 코드베이스에 코드를 추가하는 위험을 증가시킵니다. 알지 못하는 미묘한 부분이 있을 수 있으며, 이로 인해 버그나 성능 저하가 발생할 수 있습니다.

  • 도메인 경계를 위반합니다. 특정 네임스페이스(예: Git::) 내부에서 다른 도메인의 클래스(Ci:: 또는 MergeRequests::)가 불쑥 등장합니다.

EventStore란?#

Gitlab::EventStore는 기존의 Sidekiq 워커와 현재의 옵저버빌리티(observability) 위에 구축된 기본적인 pub-sub 시스템입니다. 이 시스템을 사용하여 도메인을 모델링할 때 이벤트 기반 접근 방식을 적용하면서 결합도를 최소화합니다.

이 방식은 기존 Sidekiq 워커를 그대로 유지하면서 비동기 작업을 수행하지만, 의존성의 방향을 역전시킵니다.

EventStore 예시#

CI 파이프라인이 생성될 때, 파이프라인의 ref와 일치하는 모든 머지 리퀘스트의 헤드 파이프라인을 업데이트합니다. 그러면 머지 리퀘스트에서 최신 파이프라인의 상태를 표시할 수 있습니다.

EventStore 없이#

Ci::CreatePipelineService를 변경하고 파이프라인이 생성되었는지 확인하는 로직(예: if 문)을 추가합니다. 그런 다음, MergeRequests:: 도메인의 사이드 이펙트를 실행할 워커를 예약합니다.

이 방식은 개방-폐쇄 원칙(Open-Closed Principle)을 위반하며 다른 도메인의 사이드 이펙트 로직을 불필요하게 추가하여 결합도를 높입니다:

graph LR subgraph ci[CI] cp[CreatePipelineService] end

subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end

subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end

cp -- perform_async --> upw cp -- perform_async --> pow

EventStore 사용 시#

Ci::CreatePipelineServiceCi::PipelineCreatedEvent 이벤트를 발행하고, 역할이 여기서 끝납니다.

MergeRequests:: 도메인은 워커 MergeRequests::UpdateHeadPipelineWorker로 이 이벤트를 구독할 수 있으므로:

  • 사이드 이펙트는 비동기적으로 예약되어, 도메인 이벤트를 발행하는 주 비즈니스 트랜잭션에 영향을 주지 않습니다.

  • 주 비즈니스 트랜잭션을 수정하지 않고도 더 많은 사이드 이펙트를 추가할 수 있습니다.

  • 어떤 도메인이 관여하고 소유권이 어디에 있는지 명확하게 볼 수 있습니다.

  • 시스템에서 어떤 이벤트가 발생하는지 명시적으로 선언되어 있어 식별할 수 있습니다.

Gitlab::EventStore를 사용하더라도 구독자(Sidekiq 워커)와 도메인 이벤트의 스키마 사이에는 여전히 결합이 존재합니다. 이 수준의 결합은 주 트랜잭션(Ci::CreatePipelineService)이 다음에 결합되는 것보다 훨씬 작습니다:

  • 다수의 구독자.

  • 다양한 구독자 호출 방식(조건부 호출 포함).

  • 다양한 파라미터 전달 방식.

graph LR subgraph ci[CI] cp[CreatePipelineService] cp -- publish --> e[PipelineCreateEvent] end

subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end

subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end

upw -. subscribe .-> e pow -. subscribe .-> e

각 구독자는 그 자체로 Sidekiq 워커이므로, 담당하는 작업 유형과 관련된 속성을 지정할 수 있습니다. 예를 들어, 한 구독자는 urgency: high를 정의하고, 덜 중요한 다른 구독자는 urgency: low를 설정할 수 있습니다.

EventStore는 의존성 역전(Dependency Inversion)을 가능하게 해주는 추상화입니다. 이를 통해 비즈니스 트랜잭션과 사이드 이펙트(종종 다른 도메인에서 실행되는)를 분리하는 데 도움이 됩니다.

이벤트가 발행되면, EventStore는 구독된 각 워커에서 perform_async를 호출하여 이벤트 정보를 인수로 전달합니다. 이는 기본적으로 각 구독자의 큐에 Sidekiq job을 예약합니다.

즉, 구독자는 단순히 Sidekiq 워커이므로 구독자가 작동하는 방식에는 다른 변경이 없습니다. 예를 들어, 워커(구독자)가 job 실행에 실패하면 해당 job은 재시도를 위해 Sidekiq으로 되돌아갑니다.

EventStore 장점#

  • 구독자(Sidekiq 워커)는 사이드 이펙트가 중요한 경우 워커 가중치를 변경하여 더 빠르게 실행되도록 설정할 수 있습니다.

  • 사이드 이펙트가 비동기적으로 실행된다는 사실을 자동으로 보장합니다. 이를 통해 다른 도메인이 주 비즈니스 트랜잭션의 성능에 영향을 주지 않고 이벤트를 구독할 수 있습니다.

EventStore 단점#

  • EventStore는 Sidekiq 위에 구축됩니다. Sidekiq 워커는 재시도 및 지수 백오프를 지원하지만, 워커가 재시도 한도를 초과하여 Sidekiq job이 손실되는 경우가 있습니다. 또한, 인시던트 및 재해 복구 과정에서 Sidekiq job이 삭제될 수 있습니다. 많은 중요한 GitLab 기능이 Sidekiq의 내구성을 전제로 하지만, 일부 중요한 데이터 무결성 기능에는 이것이 허용되지 않을 수 있습니다. 작업이 결국 완료되어야 함을 보장해야 한다면, Sidekiq cron 워커가 job을 처리하는 Postgres 큐잉 메커니즘을 구현해야 할 수 있습니다. 이 접근 방식의 예는 ::LooseForeignKeys::CleanupWorker::BatchedGitRefUpdates::ProjectCleanupWorker에서 확인할 수 있습니다. 일반적으로 파티셔닝된 테이블을 생성하고 데이터를 삽입하면, 나중에 cron 워커가 처리하고 작업 후 데이터베이스에서 processed로 표시합니다. ::Elastic::ProcessBookkeepingService에서 사용하는 것과 같이 Redis에서 신뢰할 수 있는 큐를 구현하는 전략도 있습니다. 코드베이스에 새로운 큐잉 패턴을 도입하려면 프로세스 초기에 메인테이너의 조언을 구해야 합니다.

  • 또는 로직이 주 비즈니스 트랜잭션의 일부로 처리되어야 하고 사이드 이펙트가 아닌 경우에는 EventStore 사용을 고려하지 마세요.

  • Sidekiq 워커는 기본적으로 제한이 없지만, 공유 리소스가 포화될 위험이 있는 경우 동시성 제한(concurrency limit) 구성을 고려해야 합니다.

이벤트 정의#

새 이벤트는 레거시 이벤트 대신 Cloud Event만 사용해야 합니다.

Event 객체는 bounded context에서 발생한 도메인 이벤트를 나타냅니다. 프로듀서는 발생한 일을 이벤트로 발행하여 다른 bounded context에 알리고, 이에 반응할 수 있게 합니다. 이벤트는 <domain_object><action>Event로 명명해야 하며, action은 과거 시제를 사용합니다. 예를 들어 AddReviewerEvent 대신 ReviewerAddedEvent입니다. domain_object는 bounded context를 기반으로 명확한 경우 생략할 수 있습니다. 예를 들어 MergeRequest::MergeRequestApprovedEvent 대신 MergeRequest::ApprovedEvent입니다.

좋은 이벤트를 위한 지침#

이벤트는 API나 UI처럼 공개 인터페이스입니다. 제품 및 디자인 담당자와 협력하여 새 이벤트가 구독자의 요구를 충족하는지 확인하세요. 가능한 경우 새 이벤트는 다음 원칙을 따르도록 노력해야 합니다:

  • 의미론적(Semantic): 이벤트는 구독자를 위한 의도된 액션이 아니라, bounded context 내에서 발생한 일을 설명해야 합니다.

  • 구체적(Specific): 이벤트는 지나치게 정밀하지 않으면서 좁게 정의되어야 합니다. 이렇게 하면 구독자가 수행해야 하는 이벤트 필터링의 양과 구독해야 하는 고유 이벤트 수를 최소화합니다. 추가 정보를 전달할 때는 프로퍼티를 사용하는 것을 고려하세요.

  • 범위 지정(Scoped): 이벤트는 해당 bounded context로 범위를 지정해야 합니다. 자신의 bounded context에 포함되지 않은 도메인 객체에 대한 이벤트 발행은 피하세요.

예시#

원칙 좋음 나쁨
의미론적(Semantic) MergeRequest::ApprovedEvent MergeRequest::NotifyAuthorEvent
구체적(Specific) MergeRequest::ReviewerAddedEvent • MergeRequest::ChangedEvent • MergeRequest::CodeownerAddedAsReviewerEvent
범위 지정(Scoped) MergeRequest::CreatedEvent Project::MergeRequestCreatedEvent

Cloud Event 구조#

GitLab Events Platform은 벤더 중립적인 이벤트 데이터 형식인 CloudEvents 명세 v1.0을 기반으로 하는 새로운 유형의 EventStore 이벤트를 도입합니다. 앞으로 EventStore의 모든 이벤트는 Cloud Events를 사용하여 이 새로운 명세를 따라야 합니다.

Cloud Event 페이로드는 다음 JSON 스키마를 따릅니다

{
  'type' => 'object',
  'properties' => {
    'specversion' => { 'type' => 'string' },
    'type' => { 'type' => 'string' },
    'source' => { 'type' => 'string' },
    'id' => { 'type' => 'string' },
    'gitlab_user_id' => { 'type' => 'number' },
    'gitlab_user_username' => { 'type' => 'string' },
    'gitlab_organization_id' => { 'type' => 'number' },
    'time' => { 'type' => 'string', 'format' => 'date-time' },
    'datacontenttype' => { 'type' => 'string' },
    'dataschema' => { 'type' => 'string', 'format' => 'uri' },
    'subject' => { 'type' => 'string' },
    'data' => { 'type' => 'object' } # 이벤트가 커스텀 페이로드를 주입하는 곳입니다
  },
  'required' => required_fields
}

페이로드는 두 부분으로 구성됩니다:

  • Cloud Events 명세를 따르는 외부 필수 페이로드. CloudEvent 클래스에 의해 생성됩니다.

  • data 속성 내의 내부 커스텀 페이로드. 커스텀 이벤트 자체는 data_schema를 통해 이 페이로드의 유효성 검사를 담당합니다. 다음 섹션의 예시를 참고하세요.

Cloud Event 정의#

Cloud Events는 app/events/<namespace>/ 아래에 정의되며 Gitlab::EventStore::CloudEvent를 상속합니다.

이벤트 카테고리와 유형은 클래스 수준 메서드를 사용하여 설정하고, data_schema 메서드와 build 팩토리 메서드를 구현합니다:

module MergeRequests
  class AssignedReviewersEvent < Gitlab::EventStore::CloudEvent
    event_category :merge_requests
    event_type :assigned_reviewers

    class << self
      def build(current_user:, merge_request:, new_reviewers:)
        build_cloud_event(
          source: "projects/#{merge_request.project.id}",
          subject: "merge_requests/#{merge_request.id}",
          current_user: current_user,
          organization: merge_request.project.organization,
          event_data: generate_event_data(merge_request, new_reviewers)
        )
      end

      private

      def generate_event_data(merge_request, new_reviewers)
        {
          merge_request_id: merge_request.id,
          merge_request_iid: merge_request.iid,
          project_id: merge_request.project_id,
          new_reviewers: new_reviewers.map { |r| { id: r.id, user_type: r.user_type } }
        }
      end
    end

    def data_schema
      {
        type: "object",
        required: %w[merge_request_id merge_request_iid project_id new_reviewers],
        properties: {
          merge_request_id: { type: "integer" },
          merge_request_iid: { type: "integer" },
          project_id: { type: "integer" },
          new_reviewers: {
            type: "array",
            items: {
              type: "object",
              required: %w[id user_type],
              properties: {
                id: { type: "integer" },
                user_type: { type: "string" }
              }
            }
          }
        },
        additionalProperties: false
      }
    end
  end
end

event_categoryevent_type 속성은 Cloud Event type 필드 (com.gitlab.<category>.<type>)와 dataschema URL (https://gitlab.com/schemas/<category>/<type>/v1.0)을 결정합니다. 일반적으로 event_category는 도메인에 매핑되고 event_type은 도메인 내의 특정 액션입니다.

data_schema 메서드는 Cloud Event 페이로드의 data 필드를 유효성 검사하는 유효한 JSON 스키마를 반환해야 합니다. 외부 엔벨로프 필드(specversion, type, source 등)는 CloudEvent 클래스에서 자동으로 유효성 검사됩니다.

이벤트 스키마 정의 (레거시)#

새 이벤트는 레거시 이벤트 대신 Cloud Event만 사용해야 합니다. 이 섹션은

레거시 이벤트 유지보수를 위해서만 참고하세요.

새 이벤트 클래스를 app/events/<namespace>/ 아래에 과거에 발생한 일을 나타내는 이름으로 정의합니다:

class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
  def schema
    {
      'type' => 'object',
      'required' => ['pipeline_id'],
      'properties' => {
        'pipeline_id' => { 'type' => 'integer' },
        'ref' => { 'type' => 'string' }
      }
    }
  end
end

유효한 JSON 스키마이어야 하는 스키마는 JSONSchemer 젬에 의해 유효성 검사됩니다. 유효성 검사는 이벤트 객체를 초기화할 때 즉시 수행되어 발행자가 구독자와의 계약을 따르도록 보장합니다.

스키마 변경에 필요한 롤아웃 횟수를 줄이기 위해 가능한 한 선택적 프로퍼티를 사용해야 합니다. 그러나 required 프로퍼티는 이벤트 주체의 고유 식별자에 사용할 수 있습니다. 예를 들어:

  • pipeline_idCi::PipelineCreatedEvent의 required 프로퍼티가 될 수 있습니다.

  • project_idProjects::ProjectDeletedEvent의 required 프로퍼티가 될 수 있습니다.

특정 구독자에 맞게 페이로드를 조정하지 않고 구독자에게 필요한 프로퍼티만 발행하세요. 페이로드는 이벤트를 완전히 나타내야 하며 느슨하게 관련된 프로퍼티를 포함해서는 안 됩니다. 예를 들어:

event = Ci::PipelineCreatedEvent.new(data: {
  pipeline_id: pipeline.id,
  # 모든 구독자가 머지 리퀘스트 ID를 필요로 하는 것이 아니라면,
  # 이 데이터는 구독자가 직접 가져올 수 있습니다.
  merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})

더 많은 프로퍼티로 이벤트를 발행하면 구독자가 처음부터 필요한 데이터를 갖게 됩니다. 그렇지 않으면 구독자가 데이터베이스에서 추가 데이터를 가져와야 합니다. 그러나 이로 인해 스키마가 지속적으로 변경되고 단일 진실 공급원(Single Source Of Truth, SSOT)을 나타내지 않을 수 있는 프로퍼티가 추가될 수 있습니다. 이 기법은 성능 최적화로 사용하는 것이 가장 좋습니다. 예를 들어, 이벤트에 많은 구독자가 있고 모두 데이터베이스에서 동일한 데이터를 다시 가져오는 경우입니다.

이벤트 업데이트#

스키마 또는 이벤트 이름을 변경하려면 여러 번의 롤아웃이 필요합니다. 새 버전이 배포되는 동안:

  • 기존 발행자는 이전 버전을 사용하여 이벤트를 발행할 수 있습니다.

  • 기존 구독자는 이전 버전을 사용하여 이벤트를 소비할 수 있습니다.

  • 이벤트는 Sidekiq 큐에 job 인수로 지속되므로, 배포 중에 스키마의 두 버전이 공존할 수 있습니다.

스키마 변경은 궁극적으로 Sidekiq 인수에 영향을 미치므로, 여러 롤아웃과 관련하여 Sidekiq 스타일 가이드를 참고하세요.

이벤트 이름 변경#

  • 롤아웃 1: 새 이벤트를 도입하고 구독자를 준비합니다.

새 이름으로 이벤트의 복사본을 도입합니다(이전 이벤트가 새 이벤트를 상속하도록 할 수 있습니다).

  • 구독자 워커가 이벤트 이름을 알고 있다면, 새 이벤트도 처리할 수 있도록 합니다.

  • 롤아웃 2: 새 이벤트를 구독자로 라우팅합니다.

발행자를 새 이벤트를 사용하도록 변경합니다.

  • 이전 이벤트를 사용하던 모든 구독을 새 이벤트를 사용하도록 변경합니다.

  • 이전 이벤트 클래스를 제거합니다.

프로퍼티 추가#

  • 롤아웃 1:

새 프로퍼티를 선택적(required 아님)으로 추가합니다.

  • 구독자를 업데이트하여 새 프로퍼티가 있거나 없는 이벤트를 모두 소비할 수 있도록 합니다.

  • 롤아웃 2:

발행자를 변경하여 새 프로퍼티를 제공합니다.

  • 롤아웃 3: (프로퍼티가 required이어야 하는 경우):

스키마와 구독자 코드를 변경하여 항상 해당 프로퍼티를 기대하도록 합니다.

프로퍼티 제거#

  • 롤아웃 1:

프로퍼티가 required이면 선택적으로 만듭니다.

  • 구독자를 업데이트하여 항상 해당 프로퍼티를 기대하지 않도록 합니다.

  • 롤아웃 2:

이벤트 발행에서 프로퍼티를 제거합니다.

  • 해당 프로퍼티를 처리하는 코드를 구독자에서 제거합니다.

기타 변경사항#

프로퍼티 이름 변경과 같은 기타 변경사항의 경우 동일한 단계를 사용합니다:

  • 이전 프로퍼티 제거

  • 새 프로퍼티 추가

CloudEvent 발행#

이벤트 발행은 Cloud Events와 레거시 EventStore 이벤트가 동일합니다

Gitlab::EventStore.publish(event)

가능하면 이벤트는 관련 Service 클래스에서 디스패치되어야 합니다. 상태 머신 전환과 같이 모델이 이벤트를 발행하도록 허용하는 예외도 있습니다. 예를 들어, 사이드 이펙트 컬렉션을 실행하는 Ci::BuildFinishedWorker를 예약하는 대신 Ci::BuildFinishedEvent를 발행하고 다른 도메인이 비동기적으로 반응하도록 할 수 있습니다.

ActiveRecord 콜백은 도메인 이벤트를 나타내기에는 너무 저수준입니다. 이는 더 많은 데이터베이스 레코드 변경을 나타냅니다. 의미가 있는 경우도 있지만, 그런 예외를 신중하게 고려해야 합니다.

Cloud Event 구독자 생성#

Cloud Event 구독은 레거시 이벤트와 동일한 패턴을 따릅니다. 단, 커스텀 이벤트 데이터에 접근할 때는 event_data 속성을 통해 접근합니다.

module MergeRequests
  class ProcessAssignedReviewersWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      # 엔벨로프 필드 접근
      user = event.current_user
      organization = event.organization

      # 이벤트별 페이로드 접근
      merge_request_id = event.event_data[:merge_request_id]
      new_reviewers = event.event_data[:new_reviewers]

      # ...
    end
  end
end

레거시 이벤트 구독자 생성#

구독자는 Gitlab::EventStore::Subscriber 모듈을 포함하는 Sidekiq 워커입니다. 이 모듈은 perform 메서드를 처리하고 handle_event 메서드를 통해 이벤트를 안전하게 처리하는 더 나은 추상화를 제공합니다. 예를 들어:

module MergeRequests
  class UpdateHeadPipelineWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
        # ...
      end
    end
  end
end

이벤트에 구독자 등록#

이는 Cloud Events와 레거시 이벤트 모두에 적용됩니다.

워커를 특정 이벤트에 구독하려면 구독 클래스를 생성하고 register 메서드 내에 이벤트를 등록합니다:

구독을 등록할 때 [카나리 배포와의 호환성 보장](/19.1/development/sidekiq/compatibility_across_updates/#adding-new-workers)을 위해

Sidekiq 워커는 이전 배포에서 도입되어야 하거나 피처 플래그를 사용해야 합니다.

module Gitlab
  module EventStore
    module Subscriptions
      class SbomSubscriptions < BaseSubscriptions
        def register
          store.subscribe ::Sbom::ProcessTransferEventsWorker, to: ::Projects::ProjectTransferedEvent,
            if: ->(event) do
              actor = ::Project.actor_from_id(event.data[:project_id])
              Feature.enabled?(:sync_project_archival_status_to_sbom_occurrences, actor)
            end
        end
      end
    end
  end
end

bounded context마다 별도의 구독 클래스가 있어야 합니다. 구독은 이벤트가 아닌 워커의 bounded context를 사용하여 구성됩니다.

  • FOSS 워커는 lib/gitlab/bounded_contexts/subscriptions/[context]_subscriptions.rb에 구독을 생성해야 합니다.

  • 도메인이 FOSS와 EE 모두에 존재하는 경우, EE 코드는 ee/lib/ee/gitlab/event_store/subscriptions/[context]_subscriptions.rb에 배치해야 합니다.

  • EE 전용 도메인은 ee/lib/gitlab/event_store/subscriptions/[context]_subscriptions.rb에 배치해야 합니다.

새로 생성된 구독 그룹은 lib/gitlab/event_store.rbSUBSCRIPTION_GROUPS 상수 또는 ee/lib/ee/gitlab/event_store.rbEE_SUBSCRIPTION_GROUPS에 추가해야 합니다.

워커가 구독되었는지 확인하는 테스트는 구독자 테스트에 따라 워커의 spec 파일에 배치해야 합니다.

구독은 Rails 앱이 로드될 때 메모리에 저장되며 즉시 동결됩니다. 런타임에 구독을 수정하는 것은 불가능합니다.

이벤트의 조건부 디스패치#

구독은 이벤트를 수락할 조건을 지정할 수 있습니다:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  if: -> (event) { event.data[:merge_request_id].present? }

이는 조건이 충족될 때 이벤트 스토어가 Ci::PipelineCreatedEvent를 구독자에게 디스패치하도록 합니다.

이 기법은 구독자가 이벤트의 소규모 서브셋에만 관심이 있는 경우 Sidekiq job 예약을 피할 수 있습니다.

조건부 디스패치를 사용할 때는 주어진 이벤트가 발행될 때마다 동기적으로 실행되므로

저렴한 조건만 포함해야 합니다.

복잡한 조건의 경우 모든 이벤트를 구독하고 구독자 워커의 handle_event 메서드에서 로직을 처리하는 것이 좋습니다.

이벤트의 지연 디스패치#

구독은 이벤트를 수신할 지연 시간을 지정할 수 있습니다:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  delay: 1.minute

delay 파라미터는 구독자 Sidekiq 워커에서 perform_async 대신 perform_in 메서드를 사용하도록 이벤트 디스패치를 전환합니다.

이 기법은 많은 이벤트를 발행하고 Sidekiq 중복 제거를 활용할 때 유용합니다.

이벤트 그룹 발행#

일부 시나리오에서는 단일 비즈니스 트랜잭션에서 동일한 유형의 여러 이벤트를 발행합니다. 이는 각 이벤트마다 job을 호출하여 Sidekiq에 추가 부하를 줍니다. 이런 경우 Gitlab::EventStore.publish_group을 호출하여 이벤트 그룹을 발행할 수 있습니다. 이 메서드는 동일한 유형의 이벤트 배열을 받습니다. 기본적으로 구독자 워커는 최대 10개의 이벤트 그룹을 받지만, 구독을 생성할 때 group_size 파라미터를 정의하여 구성할 수 있습니다. 발행된 이벤트 수는 구성된 group_size에 따라 배치로 구독자에게 디스패치됩니다. 그룹 수가 100을 초과하면, Sidekiq의 부하를 줄이기 위해 각 그룹을 10초 지연으로 예약합니다.

store.subscribe ::Security::RefreshProjectPoliciesWorker,
  to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
  delay: 1.minute,
  group_size: 25

구독자 워커의 handle_event 메서드는 그룹의 각 이벤트에 대해 호출됩니다.

구독자 제거#

Gitlab::EventStore는 Sidekiq을 기반으로 하므로, 다음으로 시작하는 Sidekiq 워커 제거 가이드를 따릅니다:

  • job을 큐에 넣는 코드를 제거하기 위해 구독을 먼저 제거합니다.

  • 구독자 워커를 no-op으로 만듭니다. 이를 위해 워커에서 Gitlab::EventStore::Subscriber 모듈을 제거해야 합니다.

테스트#

Cloud Event 클래스 테스트#

data_schema를 유효성 검사하려면 'a cloud event with schema' 공유 예시를 사용하세요:

RSpec.describe MergeRequests::AssignedReviewersEvent, feature_category: :code_review_workflow do
  it_behaves_like 'a cloud event with schema',
    valid_data: {
      merge_request_id: 1,
      merge_request_iid: 10,
      project_id: 100,
      new_reviewers: [{ id: 1, user_type: "human" }]
    },
    missing_required: %i[merge_request_id merge_request_iid project_id new_reviewers],
    invalid_types: {
      merge_request_id: 'not_an_integer',
      new_reviewers: 'not_an_array'
    }
end

레거시 Event 클래스 테스트#

schema를 유효성 검사하려면 'an event with schema' 공유 예시를 사용하세요:

RSpec.describe MergeRequests::ApprovedEvent, feature_category: :code_review_workflow do
  it_behaves_like 'an event with schema',
    valid_data: { current_user_id: 1, merge_request_id: 2 },
    missing_required: %i[current_user_id merge_request_id],
    invalid_types: {
      current_user_id: 'not_an_integer',
      merge_request_id: 'not_an_integer',
      approved_at: 'not-a-date'
    }

  describe '#schema' do
    context 'with valid optional approved_at' do
      it 'accepts a date-time string' do
        data = { current_user_id: 1, merge_request_id: 2, approved_at: '2024-01-10T12:00:00Z' }

        expect { described_class.new(data: data) }.not_to raise_error
      end
    end
  end
end

발행자 테스트#

발행자의 역할은 이벤트가 올바르게 발행되었는지 확인하는 것입니다.

이벤트가 올바르게 발행되었는지 테스트하려면 RSpec 매처 :publish_event를 사용할 수 있습니다:

it 'publishes a ProjectDeleted event with project id and namespace id' do
  expected_data = { project_id: project.id, namespace_id: project.namespace_id }

  # 매처는 블록이 호출될 때 블록이 예상된 이벤트와 데이터를 발행하는지 확인합니다.
  expect { destroy_project(project, user, {}) }
    .to publish_event(Projects::ProjectDeletedEvent)
    .with(expected_data)
end

:publish_event 매처 내에서 매처를 조합하는 것도 가능합니다. 이는 특정 종류의 값으로 이벤트가 생성되었는지 확인하고 싶지만 값을 미리 알 수 없는 경우에 유용합니다. 이러한 예는 새 레코드를 생성한 후 이벤트를 발행할 때입니다.

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # 프로젝트 ID는 expect 블록에서 `create_project`가
  # 호출될 때만 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
end

여러 이벤트를 발행할 때 발행되지 않은 이벤트도 확인할 수 있습니다.

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # 프로젝트 ID는 `expect` 블록에서 `create_project`가
  # 호출될 때 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
    .and not_publish_event(Projects::ProjectDeletedEvent)
end

구독자 테스트#

구독자는 발행된 이벤트가 올바르게 소비될 수 있는지 확인해야 합니다. 이를 위해 구독자를 테스트하는 방식을 표준화하는 헬퍼와 공유 예시를 추가했습니다:

RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
  let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }

  # 이 공유 예시는 이벤트가 발행되고 현재 구독자(`described_class`)에 의해
  # 올바르게 처리되는지 확인합니다. 또한 워커가 멱등성을 가지는지 확인합니다.
  it_behaves_like 'subscribes to event' do
    let(:event) { pipeline_created_event }
  end

  # 이 공유 예시는 발행된 이벤트가 무시되는지 확인합니다. 이는
  # 조건부 디스패치 테스트에 유용할 수 있습니다.
  it_behaves_like 'ignores the published event' do
    let(:event) { pipeline_created_event }
  end

  it 'does something' do
    # 이 헬퍼는 `handle_event`가 올바르게 호출되는지 확인하면서 `perform`을 직접 실행합니다.
    consume_event(subscriber: described_class, event: pipeline_created_event)

    # 기대값 실행
  end
end

모범 사례#

이벤트가 항상 발생하는 코드(CE 또는 EE)와 동일한 위치에 이벤트 클래스를 정의하고 이벤트를 발행합니다.

이벤트가 CE 기능의 결과로 발생하는 경우, 이벤트 클래스는 CE에서 정의되고 발행되어야 합니다. 마찬가지로 이벤트가 EE 기능의 결과로 발생하는 경우, 이벤트 클래스는 EE에서 정의되고 발행되어야 합니다.

  • 이벤트에 의존하는 구독자를 종속 기능이 존재하는 동일한 코드(CE 또는 EE)에 정의합니다.

CE에서 발행된 이벤트(예: Projects::ProjectCreatedEvent)와 EE에서 정의된 이 이벤트에 의존하는 구독자(예: Security::SyncSecurityPolicyWorker)가 있을 수 있습니다.

  • 이벤트 클래스를 정의하고 동일한 bounded context(최상위 Ruby 네임스페이스) 내에서 이벤트를 발행합니다.

주어진 bounded context는 자체 컨텍스트와 관련된 이벤트만 발행해야 합니다.

  • 이벤트를 구독할 때 신호 대 잡음 비율을 평가합니다. 구독자 내에서 처리하는 이벤트 대비 무시하는 이벤트는 얼마나 되나요? 이벤트의 소규모 서브셋에만 관심이 있는 경우 조건부 디스패치 사용을 고려하세요. 조건부 디스패치로 동기 검사를 실행하는 것과 잠재적으로 중복된 워커를 예약하는 것 사이에서 균형을 잡으세요.