본문 바로가기
공부/Spring Boot

SpringBoot) 비동기 이벤트 기반으로 Elasticsearch와 RDB간 정합성 유지를 위한 노력

by son_i 2025. 4. 19.
728x90

문제 상황

회원 정보를 변경하거나 수정하면 더이상 검색 결과에 뜨지 않았다.

 

기존 구현 :

  회원을 이메일과 이름으로 ES를 이용해 검색할 수 있고 이 각각의 도큐먼트는 회원가입 시에 Member 테이블 + Member 인덱스에 저장이 된다. 

 

이후에 회원 정보 수정으로 이름을 변경하면 더이상 검색으로 다른 사용자를 찾을 수 없었고 - problem1

회원 탈퇴를 했을 때 (softDelete라 DB에 남아는 있지만)도 여전히 검색으로 조회를 할 수 있었다. - problem2

 

생각해보니 회원 수정이나 삭제 할 때는 ES에 반영해주는 로직을 구현하지 않았었다 !

이럴 경우 RDB와 ES간 정합성이 깨지는 문제가 발생한다.


해결 방법 (고민)

일단 제일 간단하게는 회원 정보 수정, 삭제 로직에 ES에도 도큐먼트를 똑같이 수정 삭제 하는 로직을 구현하는 것이다.

 이 방법의 문제점은 구현은 간단하지만 추후 기능이 확장될 경우 이 서비스 로직에 다른 관심사를 가진 기능들이 들어가는 것이다.  ==>> SRP를 위반하게 됨.

 

그래서 Scheduler를 통해 일정 시간마다 DB와 ES간의 정보가 다르면 ? 업데이트 시켜주는 방식도 생각해봤는데 잦은 시간에 Scheduler를 동작시키기엔 DB 부하가 걱정이 되고 (사실 그렇게까지 중요한 기능은 아닌 것 같으므로) 그렇다고 시간을 길게 잡으면 또 사용자 입장에서 불편함이 생길 것 같았다.

 

바로 직전에 알림 기능을 구현했는데 여기서 프론트가 이벤트 리스너를 사용해 내가 보내주는 sse 특정 알림이 발생하면 사용자에게 푸시 알림을 내려주고 있던 것에서 스프링부트에는 이벤트 처리 방식이 없나 궁금했다.

 

찾아보니 당연히 존재했고 또한 이벤트 기반으로 구현을 하면 나중에 Kafka나 RabbitMQ같은 메시징 시스템으로 확장까지 가능하다는 내용을 알게 됐다 !

 

>> Kafka & RabbitMQ ! <<

앞전 프로젝트에서 Websocket(STOPM)를 활용한 채팅기능 개발할 때도 들어본 적이 있고  

요번에 알림 기능을 개발하면서도 특정 상황이 발생하면 실시간으로 sse 알림을 전송해주는데 이를 kafka나 RabbitMQ같은 외부 메시징 브로커를 이용해 확장해보려고 고려중이었다.

  => 이유는 배포 서버에서 클라이언트에게 알림이 보내져야하는데 씹히는 문제가 발생하기도 했고 DB 커넥션 풀 문제도 발생했기 때문이다. (프론트가 새로고침 8번 정도 하면 DB Connection pool 오류 뜨면서 서버 다운)
 일단 notificationService로직을 호출하는 friendService의 requestProcess에 @Transaction이 걸려있다. 아마도 이걸 원인으로 DB Connection Pool오류가 나는 거 같음.

 

따라서 이벤트 기반 처리 방식을 구현해보기로 하였다.

 


Document 구조 변경

구현에 앞서 Document 구조가 좀 효율적이지 않다고 느껴졌다. memberId, email, name, image가 들어있었는데 image는 회원 검색해서 조회 결과로는 필요하지만 ES document에 넣어둘 경우 이미지를 변경할 때마다 ES 도큐먼트도 또 수정해줘야 한다.

image는 검색에 쓰이지도 않으므로 여기에 들어있는 것은 불필요했다.

 

그럼 image를 document에서 제거하면 ?

                    memberSearchResponse에는 image가 필요하므로 회원 DB에서 또 조회를 해줘야한다.

응 ????????????????

이 방법의 문제점 (이라고 생각한 것)

 

Full Scan을 막기 위해서 ES를 이용한 검색 기능을 적용한 건데 ES에서 결과로 찾아온 document에서 memberId로 RDB에서 또 회원을 조회한다 ? ES를 쓴 이유가 없지 않나 라고 생각했다.

 

하지만 결국은 이점

하지만 이렇게 검색을 ElasticSearch로 진행하고 얻어온 memberDocument에서 memberId로 회원 table에서 조회를 하면 Id 값에 인덱스가 자동 생성되어 있어서 Full scan이 아닌 PK 또는 인덱스 기반 조회가 된다.

 

따라서 ES는 검색 + DB는 데이터 제공 이라는 역할 분리를 통해 검색에 필요한 항목만 document에 넣어서 빠르게 조회하고 얻어온 id로 DB에서 최신의 정확한 정보를 얻는 전략을 사용하게 되었다.

 

 

기존구조

사용자 검색(ES) 으로 얻은 document -> dto로 변환해 사용자에게 전달.

(이 경우 Document의 구조는 회원 조회 검색 조건과 Response에 필요한 memberId, name, email, image모두를 담고있음.)

🔽

바뀐구조

사용자 검색(ES) 으로 얻은 document -> membeId로 프로젝트 DB에서 Member entity 조회 -> dto로 변환해 사용자에게 전달.

(이 경우 Document의 구조는 검색 조건에 필요한 memberId, name, email만 가져도 됨.)


구현 과정

1. Document 필드 변경

package com.sj.Petory.common.es;

import com.sj.Petory.domain.friend.dto.MemberSearchResponse;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Setting(settingPath = "/elastic/member-settings.json")
@Mapping(mappingPath = "/elastic/member-mappings.json")
@Document(indexName = "member")
public class MemberDocument {

    @Id
    private Long memberId;

    private String name;
    private String email;

    public MemberDocument updateName(String newName) {

        return MemberDocument.builder()
                .memberId(this.getMemberId())
                .name(newName)
                .email(this.getEmail())
                .build();
    }
}

 

2. 회원 수정, 삭제 시에 Event 발행

///회원 수정 시에 발생시킬 MemberUpdatedEvent
@RequiredArgsConstructor
@Getter
public class MemberUpdatedEvent {

    private final Long memberId;
    private final String name;
}

///회원 삭제 시에 발생시킬 MemberDeletedEvent
@RequiredArgsConstructor
@Getter
public class MemberDeletedEvent {

    private final Long memberId;
}

 

updateMember, deleteMember 메소드 안에 eventPublisher.publishEvent(new 각 발생시킬 이벤트()) 를 호출해 이벤트 발생

@Transactional
    public boolean updateMember(final MemberAdapter memberAdapter, final UpdateMemberRequest request) {
        Member member = getMemberByEmail(memberAdapter.getEmail());

        String name = request.getName();
        if (name != null) {
            checkNameDuplicate(name);

            eventPublisher.publishEvent(
                    new MemberUpdatedEvent(member.getMemberId(), name));
        }

        if (StringUtils.hasText(request.getPassword())) {
            request.setPassword(
                    passwordEncoder.encode(request.getPassword()));
        }
        member.updateInfo(request);

        return true;
    }

    @Transactional
    public boolean deleteMember(final MemberAdapter memberAdapter) {
        Member member = getMemberByEmail(memberAdapter.getEmail());

        validateDeleteMember(member);

        eventPublisher.publishEvent(new MemberDeletedEvent(member.getMemberId()));

        member.updateStatus(MemberStatus.DELETED);

        return true;
    }

 

3. EventListener에서 발생한 이벤트 잡아서 처리

package com.sj.Petory.domain.member.event;

import com.sj.Petory.common.es.MemberDocument;
import com.sj.Petory.common.es.MemberEsRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@RequiredArgsConstructor
@Component
@Slf4j
public class MemberEventListener {

    private final MemberEsRepository memberEsRepository;

    @EventListener
    public void handleMemberUpdateEvent(MemberUpdatedEvent event) {
        log.info("MemberEventListener handleMemberUpdateEvent");
        log.info(event.getName());
        memberEsRepository.findById(event.getMemberId())
                .ifPresent(doc -> {
                    MemberDocument updatedDoc = doc.updateName(event.getName());
                    memberEsRepository.save(updatedDoc);
                });
    }

    @EventListener
    public void handleMemberDeletedEvent(MemberDeletedEvent event) {

        memberEsRepository.deleteById(event.getMemberId());
    }
}

해당 이벤트가 발생했을 때 @EventListener에서 잡아서 처리할 동작 메소드에 정의

(나는 이걸 만들면서 예외처리와 굉장히 비슷하다고 느꼈다 !)

예외처리도 @ExceptionHandler 어노테이션을 붙인 메소드에 파라미터로 처리할 예외 클래스를 적어주니까 ! ㅎㅎ
그래서 이해가 더 쉬웠다.

 

 

이럼 너무 간단하게 이벤트 처리 완성이다.

근데 난 여기에 비동기 처리까지 구현했다.

 

4. 비동기 처리 위해 AsyncConfig + EventListener메소드에 @Async 추가

package com.sj.Petory.config;

import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.scheduling.annotation.EnableAsync;

@Configurable
@EnableAsync
public class AsyncConfig {

}
package com.sj.Petory.domain.member.event;

import com.sj.Petory.common.es.MemberDocument;
import com.sj.Petory.common.es.MemberEsRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.Optional;

@RequiredArgsConstructor
@Component
@Slf4j
public class MemberEventListener {

    private final MemberEsRepository memberEsRepository;

    @Async
    @EventListener
    public void handleMemberUpdateEvent(MemberUpdatedEvent event) {
        log.info("MemberEventListener handleMemberUpdateEvent");
        log.info(event.getName());
        memberEsRepository.findById(event.getMemberId())
                .ifPresent(doc -> {
                    MemberDocument updatedDoc = doc.updateName(event.getName());
                    memberEsRepository.save(updatedDoc);
                });
    }

    @Async
    @EventListener
    public void handleMemberDeletedEvent(MemberDeletedEvent event) {

        memberEsRepository.deleteById(event.getMemberId());
    }
}

 

비동기 처리를 해준 이유는 다음과 같다.

1. ES에 저장하고 삭제하는 작업은 네트워크 I/O가 포함되어 느릴 수 있다. 

2. ES가 죽어도 회원 수정은 되어야 하니까 (근데 이 경우 스켸줄러로 DB와 ES를 동기화해주는 작업이 필요하겠군)

3. 이벤트 기반 + 비동기로 처리했을 경우 추후 메시징 시스템으로 확장하기 쉬움.

 

 

이런 비동기 처리를 해줌으로써 신경써야 할 것

1. @Async는 별도의 스레드에서 실행이 되므로 이벤트 실행 시점에는 DB 트랜잭션이 커밋 되어있어야 한다 !

2. 그리고 비동기 스레드는 예외가 바로 뜨지 않아서 로깅이나 모니터링이 필요 (별도의 로그 관리 파일을 만들어야겠군)

 

 

기존 updateMember와 deleteMember에는 @Transactional이 붙어있고 이게 트랜잭션 커밋후에 이벤트가 발생함을 보증하지 않는다. 이벤트가 별도의 스레드로 실행되긴 하지만 실행 시점이 트랜잭션 커밋보다 빠를 수 있음.

@Transactional
    public boolean updateMember(final MemberAdapter memberAdapter, final UpdateMemberRequest request) {
        ...
            eventPublisher.publishEvent(
                    new MemberUpdatedEvent(member.getMemberId(), name));
    	...	
    }

 

따라서 이벤트를 트랜잭션 커밋 후에 동작하게 하는 작업이 필요하다 !

@Transactional
    public boolean deleteMember(final MemberAdapter memberAdapter) {
	...
    
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void afterCommit() {
                eventPublisher.publishEvent(new MemberDeletedEvent(member.getMemberId()));
            }
        });
        
	...
    }

TrascationSynchronizatioManager를 이용해 이벤트 발생 작업이 트랜잭션 커밋 이후에 일어나도록 하였다.

 

로그 찍어본 결과

트랜잭션 커밋 후에 이벤트가 발생한 것을 확인 할 수 있따 !

 

member.updateInfo() 실행 후 DB에 변경 사항 업데이트 -> TransactionSynchronizationManager.registerSynchronization()통해 커밋 이후 이벤트 등록 -> 메서드 끝나고 트랜잭션 커밋 -> 커밋 후 afterCommit() 실행 -> 이벤트 발생

 

 

이렇게 ES와 DB 정합성을 위한 비동기 이벤트 기반 처리는 완료 ,, !

미루고 미뤘던 큰 산을 넘은 느낌.. 생각보다 너무 간단해서 놀랐다.

해보지도 않고 겁먹지말자 !


고찰

이대로 알림 기능도 이벤트 기반으로 바꾸면 좋을 거 같고 앞서 말했던 DB Connection pool오류나 배포 환경에서 알림이 바로 오지 않는 점 등을 해결하기 위해 메시징 시스템으로 확장도 고려해봐야겠다.

 

비동기 방식으로 ES와의 정합성 로직이 구현되어 있어서 ES가 이상이 있어도 DB는 정상 동작한다. 이에 또 정합성 문제가 발생할 수 있으므로 스켸줄러를 통해 DB와 ES간의 데이터를 일치시키는 작업이 추가로 필요할 것 같다.

728x90