SpringBoot 프로젝트에서 SSE를 활용한 알림 기능 개발
도입 배경
Petory 프로젝트에서 사용자에게 알림을 보내야 할 경우가 생겼다.
어떤 조건을 만족시켰을 때 스켸줄러로 자주 체크하여 알림 정보를 보내주는 방법이 현재 나의 지식으로 구현할 수 있는 당장 떠오른 방법이다.
다만 이렇게 하자니 실시간으로 알림을 보내려는 취지에 부합하지 않다고 느껴졌다. 그래서 실시간으로 알림을 보낼 수 있는 다른 방법을 찾아보았다 !
스프링부트에서 구현할 수 있는 알림 방식
1. Pollling
HTTP를 이용한 서버-클라이언트 간 데이터 전달
동작과정
일정 주기로 클라이언트가 서버에 데이터를 요청하고 서버가 현재 상태를 응답.(데이터가 없어도 응답함)
장점
구현이 간단하다.
단점
업데이트 주기가 길면 실시간 성이라고 보기 어렵고 짧으면 불필요한 요청을 보내기 때문에 서버에 부하를 준다.
2. Long-Polling
HTTP를 이용한 서버-클라이언트 간 데이터 전달
** HTTP 요청은 단발성(staeless) 통신 방식이다. (클라이언트 요청 -> 서버 응답 후 연결 종료)
Long Polling은 HTTP 요청이지만 서버가 즉시 응답하지 않고 대기하는 방식이다.
동작 과정
클라이언트가 서버에 요청을 보내면 서버는 데이터가 준비될 때까지 응답을 보류.
서버가 응답하면 클라이언트는 그 즉시 또 요청을 보냄.
장점
일반 Polling처럼 반복적으로 요청하지 않으니 네트워크 트래픽은 줄어든다.
데이터가 업데이트 되자마자 응답 하니까 거의 실시간으로 동작하는 것처럼 보인다.
Websocket보다 구현 난이도가 낮아 Websocket을 사용할 수 없는 환경에서 실시간성이 필요할 때 사용.
단점
업데이트가 잦을 경우 이 또한 많은 요청을 보내 부하를 줄 가능성이 있다.
Polling, Long-Polling 모두 REST API 처럼 API를 호출하여 동작한다.
Polling과 Long Polling 구현 시 백엔드에서 API를 제공하고 프론트에서 반복적으로 요청을 보내는 로직이 필요하다.
2. Websocket
클라이언트와 서버의 양방향 통신을 지원하는 프로토콜로, 실시간 알림이 필요할 때 주로 사용한다.
Spring Websocket을 이용하여 구현가능하고 STOMP 프로토콜을 이용하면 더 편리하다.
장점
HTTP 요청/응답 방식보다 서버 부하가 적다.
서버가 클라이언트에 즉시 알림을 보낼 수 있다.
단점
클라이언트와 서버의 지속적인 연결 유지를 위한 리소스 사용량이 증가할 수 있다.
많은 사용자가 연결한 경우 서버 부담이 커지므로 pub/sub 구조나 kafka와 연동하여 확장하는 작업이 필요하다.
Websocket은 서버와 클라이언트가 한 번 연결이 되면 서버에서 바로 데이터를 보낼 수 있기 떄문에 클라이언트가 반복적으로 요청을 보낼 필요가 없다.
3. SSE (Server-Sent Events)
서버가 클라이언트로 단방향 메세지 스트리밍
EventSource API를 이용하여 브라우저에서 쉽게 수신이 가능하다.
장점
Websocket보다 구현이 간단
자동 재연결기능 내장
단점
클라이언트가 서버로 데이터를 전송할 수 없음.
하나의 브라우저에서 여러 SSE 연결을 허용하지 않을 수 있다.
4. FCM (Firebase Cloud Messaging)
Google의 FCM을 활용해서 모바일 알림이나 웹 푸시 알림 구현 가능
FCM SDK 또는 HTTP API를 사용하여 알림 전송 가능.
** 2024.7월 이후 기존 HTTP API 지원이 종료됨. V1 사용
장점
Android, ios, 웹 푸시 지원
백그라운드 알림 기능으로 앱이 꺼져있어도 푸시 알림을 받을 수 있다.
Firebase 콘솔에서 쉽게 설정 가능
단점
클라이언트가 네트워크에 연결되어있어야 알림을 받을 수 있다.
5. 이메일 알림
SMTP를 이용한 이메일 알림
6. 문자 알림
비용부과
정리
실시간성이 크게 중요하지 않을 때
📌Polling
📌Long-Polling
(DB 기반 알림)
실시간 알림을 구현하고 싶을 때
📌 Websocket (대화형 알림, 실시간 채팅)
📌 SSE (단방향 실시간 알림)
모바일/웹 푸시 알림이 필요할 때
📌FCM (Firebase Cloud Messaging)
결론
내가 구현하려고 하는 것은 특정 상황들에서 실시간으로 서버가 클라이언트에게 알림을 보내주는 것이다.
Polling을 실시간처럼 구현하려면 주기를 짧게 해야하는데 트래픽이 많아질 경우 서버에 부하를 줄 수 있으므로 기각
Long-Polling도 트래픽이 많아지면 요청이 늘어나므로 기각.
Websocket과 SSE 중 클라이언트 -> 서버에게는 데이터를 보낼 필요 없으므로 SSE 채택.
또한 나는 실시간 알림 뿐 아니라 알림 탭에서 이전 알림도 함께 조회할 수 있게 만들거라 알림 정보를 DB에 저장하는 로직도 필요하다.
구현
Flow
1. 사용자 로그인 후 클라이언트가 SSE 연결 요청. (/subscribe)
2. 서버에서 SseEmitter를생성하여 해당 유저의 알림 관리
3. 알림 발생하는 상황 발생시 알림 엔티티 저장 및 SSE로 실시간 전송 (SseEmitter.send()이용)
4. 사용자가 알림 목록을 조회할 수 있도록 추가.
5. 알림 읽음 처리 API
**SseEmitter는 Spring Boot에서 SSE(Server-Sent Events)를 지원하는 클래스.
서버에서 클라이언트로 실시간 데이터를 푸시하는 역할을 한다.
+ 연결이 너무 오래 유지될 때 타임아웃 설정 가능.
+ 연결이 종료되었을 때, 오류가 발생했을 때 처리 가능.
구현 과정
1. /subscribe : 구독요청 컨트롤러 & 서비스
클라이언트에서 EventSource 통해 구독 요청 -> SseEmitter 객체 map에 저장
@RestController
@RequestMapping("/notification")
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
@GetMapping("/subscribe")
public SseEmitter subscribe(
@RequestParam("token") String token) {
return notificationService.subscribe(token);
}
처음엔 로그인한 사용자 정보를 받아오기 위해 다른 api 처럼 @AuthenticationPrincipal을 이용해 헤더에서 토큰을 받아오도록 함.
But 프론트에서 EventSource로 sse요청을 보낼 때 헤더를 설정할 수 없었음.
-> @RequestParam으로 token을 받아오는 방식으로 수정
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final Map<Long, SseEmitter> emitterMap = new HashMap<>();
private final JwtUtils jwtUtils;
//SSE 구독 (클라이언트가 알림을 수신하기 위해 호출)
public SseEmitter subscribe(
final String token) {
// 1. 로그인 한 사용자 정보 얻어오기
Authentication authentication = jwtUtils.getAuthentication(token.substring("Bearer ".length()));
MemberAdapter memberAdapter = (MemberAdapter) authentication.getPrincipal();
Long memberId = memberAdapter.getMemberId();
// 2. SseEmitter 객체 생성하고 Timeout 설정
SseEmitter emitter = new SseEmitter(60_000L);
// 3. memberId를 키로 map에 Emitter 객체를 저장하는데 이미 있으면 삭제 후 map에 저장
if (emitterMap.containsKey(memberId)) {
SseEmitter oldEmitter = emitterMap.remove(memberId);
oldEmitter.complete();
}
emitterMap.put(memberId, emitter);
log.info("SSE 연결 요청 : memberId = {}", memberId);
// 4. SSE 연결 직후 더미 데이터 보내기
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("SSE 연결 성공"));
log.info("SSE 더미 이벤트 전송 완료");
} catch (IOException e) {
log.error("SSE 연결 실패 : ", e);
emitter.completeWithError(e);
}
// 5. SSE 통신에서의 콜백 메소드
emitter.onCompletion(() -> {
emitterMap.remove(memberId);
log.info("SSE 연결 종료 : memberId = {}", memberId);
});
emitter.onTimeout(() -> {
emitterMap.remove(memberId);
log.info("SSE 연결 타임아웃 : memberId = {}", memberId);
});
return emitter;
}
1. token으로 현재 로그인한 사용자 정보를 받아와야하기 때문에 JwtUtil에 만들어둔 getAuthentication() 메서드 사용
getAuthentication()은 다음과 같이 만들어두었음.
public Authentication getAuthentication(String token) {
UserDetails userDetails = userDetailsService.loadUserByUsername(
parseEmail(token));
return new UsernamePasswordAuthenticationToken(
userDetails
, ""
, userDetails.getAuthorities()
);
}
Authentication 객체() <- Spring security의 SecurityContext에 등록되어 이후 SecurityContextHolder.getContext().getAuthentication()으로 현재 인증된 사용자를 참조할 수 있다.
: userDetailService에 loadUserByUsername()메소드로 얻을 수있는 UserDetails를
-> UsernamePasswordAuthenticationToken 메소드에 파라미터로 넣어 얻을 수 있는 객체이다.
(복습할겸 덧붙이면 이 loadUserByUsername메소드에서 UserDetails 객체를 상속받은 MemerAdapter 객체를 리턴하고
@AuthenticationPrincipal을 통해 사용할 수 있음 !)
2. Timeout 시간을 설정하여 SseEmitter 객체 생성
3. 만들어둔 emitterMap에 memberId를 키로 SseEmitter 객체를 등록한다. (기존에 이미 있었으면 삭제 후 저장)
4. SseEmitter.send() 메소드를 통해 실시간 알림을 전송한다.
name과 data를 구분해서 보내면 클라이언트에서 name으로 오는 알림을 eventListener로 잡아 data를 뽑아 쓸 수 있다.
5. SSE 통신에서 연결관리를 위한 콜백 메소드
emitter.onComplete() : 클라이언트와의 SSE 연결이 정상적으로 종료될 때 호출
EmitterMap에 저장되어있는 데이터를 지움. 메모리 누수 방지
emitter.onTimeOut() : 클라이언트가 일정 시간 동안 반응이 없거나 지정한 타임아웃 시간 초과 시 호출
연결이 끊긴 것이므로 emitter 제거가 필요함.
언제 사용? ) SseEmitter는 서버와 클라이언트 간에 지속적인 연결을 유지하는 객체여서 연결 끊기는 시점에 메모리를 지속적으로 관리해줘야한다.
이 과정을 거치면 각각 터미널과 postman에 이렇게 연결됐다는 표시가 뜬다.
** 프론트에서 자동으로 재연결을 보내고 있기 때문에 타임아웃 시간을 짧게 해놓아도 괜찮다. (60000ms = 1분)
브라우저의 EventSource 객체는 내부적으로 자동 재연결을 시도한다.(default = 3s)
2. 알림이 발생하는 상황에 호출할 sendNotification()메소드
내 프로젝트에서 알림이 발생하는 경우는 4가지로 다음과 같다.
- 친구 요청이 왔을 때
- 내가 건 요청이 수락/거절 됐을 때
- 일정 알림
- 내 게시물에 댓글/좋아요 달렸을 때
1. 이때에 친구 관련 알림의 경우 friendService의 해당 메소드들에서 각각 다 Member 객체를 조회한다.
2. 하지만 일정 알림의 경우 scheduleService에서 바로 sendNotification()을 호출하는 것이 아니고 schedule_notice_time 테이블에 따로 저장을 해둔뒤에 @Scheduled를 이용해 매 분마다 테이블에서 현재 시간과 동일한 데이터를 찾아 필요한 정보를 채워서 sendNotification()메소드를 호출한다.
이 경우에는 schedule_notice_time과 schdule_notice_receiver 테이블로 조회해서 필요한 정보를 얻어오는데 이때 알림을 받아야 할 사용자들은 Member 객체가 아니고 schdule_notice_receiver테이블에 아이디로 저장이 되어있다.
따라서 이 두 가지 경우를 모두 만족시키는 알림 전송 메소드를 만들기 위해 오버라이딩을 통해 Member객체가 있을 때와 없을 때로 구분하여 만들었다.
//메소드 오버라이딩
public void sendNotification(
final NotificationPayloadDto noticePayLoad) {
Member receiveMember = memberRepository.findById(
noticePayLoad.getReceiveMemberId())
.orElseThrow(() -> new MemberException(ErrorCode.MEMBER_NOT_FOUND));
sendNotification(receiveMember, noticePayLoad);
}
public void sendNotification(
final Member receiveMember,
final NotificationPayloadDto noticePayLoad) {
// 1. 알림 객체 저장
notificationRepository.save(
Notification.builder()
.member(receiveMember)
.noticeType(noticePayLoad.getNoticeType())
.entityId(noticePayLoad.getEntityId())
.isRead(false)
.build());
// 2. sse 실시간 알림 전송
SseEmitter emitter =
emitterMap.get(
noticePayLoad.getReceiveMemberId());
if (emitter != null) {
noticePayLoad.setUnReadCount(getNoticeCount(receiveMember));
try {
emitter.send(SseEmitter.event()
.name(String.valueOf(noticePayLoad.getNoticeType()))
.data(noticePayLoad));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
구독 요청에 더미데이터를 보내는 로직처럼 emitter.send() 메소드를 통해 실시간 이벤트를 전송한다.
여기서 unReadCount도 실시간으로 보내는데 프론트가 안읽은 알림 갯수를 보내달라고 해서 추가했다.
새로운 알림이 생기면 == 안 읽은 알림이 추가 되므로 알림 전송 로직에 함께 세팅해보낸다.
또한 추후에 알림 목록 탭에서 로그인한 사용자를 기준으로 알림 목록을 조회하기 위해 Notification 객체도 저장한다.
NotificationPayloadDto는 다음과 같이 구성하여 프론트가 알림 탭에서 필요한 정보를 뽑아 쓸 수 있도록 하였다.
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class NotificationPayloadDto {
private Long receiveMemberId;
private NoticeType noticeType;
private Long entityId;
private Long sendMemberId;
private String sendMemberName;
private Long unReadCount;
}
3. 알림 목록 API
// Controller
@GetMapping
public ResponseEntity<Page<NoticeListResponse>> noticeList(
@AuthenticationPrincipal MemberAdapter memberAdapter
, Pageable pageable) {
return ResponseEntity.ok(notificationService.noticeList(
memberAdapter, pageable));
}
//Service
public Page<NoticeListResponse> noticeList(
final MemberAdapter memberAdapter
, Pageable pageable) {
Member member = getMemberByEmail(memberAdapter.getEmail());
List<NoticeListResponse> notifications =
notificationRepository.findByMember(member, pageable)
.stream().map(Notification::toListDto).toList();
return new PageImpl<>(notifications, pageable, notifications.size());
}
4. 알림 읽음 처리 API
알림 읽음 처리를 하면 안 읽은 알림 갯수에 변동이 있으므로 이때도 안 읽은 알림 갯수를 실시간 전송한다.
컨트롤러
@PatchMapping("/{noticeId}")
public ResponseEntity<Boolean> markAsRead(
@AuthenticationPrincipal MemberAdapter memberAdapter
, @PathVariable Long noticeId) {
return ResponseEntity.ok(notificationService.markAsRead(
memberAdapter, noticeId));
}
서비스
@Transactional
public boolean markAsRead(
final MemberAdapter memberAdapter, final Long noticeId) {
Member member = getMemberByEmail(memberAdapter.getEmail());
Notification notification = notificationRepository.findByNoticeIdAndMember(noticeId, member)
.orElseThrow(() -> new NoticeException(ErrorCode.INVALID_NOTIFICATION));
if (!notification.isRead()) {
notification.setRead(true);
}
SseEmitter emitter = emitterMap.get(member.getMemberId());
if (emitter != null) {
try {
long unReadCount = getNoticeCount(member);
emitter.send(SseEmitter.event()
.name("unReadCount")
.data(unReadCount));
} catch (IOException e) {
log.error("읽음 처리 후 unReadCount 전송실패");
}
}
return true;
}
더티체킹을 위해 @Transactional을 달아 변경 사항을 감지하고 commit 시점에 DB에 반영하도록 하였다.
알림 전송 로직은 처음과 동일 다른 점은 이때는 단독으로 unReadCount를 보내준다.
이렇게 하면 ! 알림 전송 구현 완료 !
알림을 설정하면 일정과 관련된 모든 사용자에게 실시간으로 알림이 전송된다.
< 좌 일정 생성자 우 일정에 포함된 반려동물의 돌보미 >
Trouble Shooting
1. DB 커넥션 pool 오류
문제 :
새로고침을 하거나 친구요청을 몇 번 시도하면 데이터베이스 커넥션 풀 문제 발생
(HikariCP에서 발생한 Connection pool exhaustion(연결 고갈) 문제)
HikariPool-1 - Connection is not available, request timed out after 30002ms
원인 :
SSE는 HTTP 연결을 계속 유지하는데 이 경우에 DB를 조회하고 있고 이 요청들이 유지되며 DB 커넥션을 점유하고 있어서 커넥션 풀이 고갈됨.
⇒사용 가능한 DB 커넥션이 계속 소비됨.
또한 subscribe 시에 token으로 사용자 인증 후 DB에서 사용자 정보를 조회하거나 알림 목록을 조회함.
이 DB 요청들이 풀을 다 써버린 것
해결 :
- /subscribe에서 DB 조회 없이 memberId를 가져옴. 이렇게 하기 위해서 memberAdapter에 memberId 필드를 추가하고 loadByUsername() 메소드에서 memberId까지 넣어줌.
- 방어 코드 추가 ⇒ 중복 연결 방지 및 자원 정리.
if (emitterMap.containsKey(memberId)) {
SseEmitter oldEmitter = emitterMap.remove(memberId);
oldEmitter.complete();}emitterMap.put(memberId, emitter);
}
- oldEmitter.complete() 는 기존에 존재하던 SSE 연결을 정상적으로 종료하는 메서드. 더이상 이벤트를 보내지 않겠다 !
2. SSE 연결에서 타임아웃이 발생했을 때 예외를 처리하는 방식에서 오류
org.springframework.http.converter.HttpMessageNotWritableException: No converter for [class com.sj.Petory.exception.dto.ErrorResponse] with preset Content-Type 'text/event-stream'
원인 :
timeOut()시 예외가 발생하면 @RestControllerAdvice에서 잡힘.
GlobalExeptionHandler에서 예외를 잡아 ErrorResponse 객체를 반환하려고 했으나 Sse 는 Content-Type : text/event-stream 을 사용하기 때문에 ErrorResponse(JSON) 형식으로 내려줄 수가 없어 발생한다.
해결 :
GlobalExceptionHandler에 @ExceptionHandler를 추가하여 log만 남기고 ErrorResponse를 내려주지 않도록 했음
@ExceptionHandler(AsyncRequestTimeoutException.class) public void handleAsyncTimeoutException(AsyncRequestTimeoutException e) { log.warn("SSE 연결 타임 아웃"); }
+onTimeOut() 처리를 해주었음에도 AsyncRequestTimeoutException이 발생하는 이유 : 서버가 일정 시간동안 클라이언트에 응답을 보내지 않으면 타임아웃이 발생하고 해당 예외를 띄운다.
AsyncRequestTimeoutException → Spring이 백그라운드에서 throw 함
*ContentType은 서버가 클라이언트에게 보내는 HTTP 응답의 형식
ContentType 종류
- application/json : json 형식의 데이터
- text/plain : 일반 텍스트
- text/event-stream : SSE 형식
- text/html : HTML 문서
참고
Redis Pub/Sub 기반 SSE(Server-Sent Events) 실시간 알림 적용기
이 글에서는 서비스 이용자에게 알림을 제공하기 위해 30초마다 주기적으로 서버에 API 호출을 하여 데이터를 받아오던 Polling 방식에서 SSE (Server-Sent-Event) 를 활용하여 실시간 알림 기능을 구현하
velog.io
https://taemham.github.io/posts/Implementing_Notification/#%EA%B5%AC%ED%98%84
[백엔드 스프링부트] 알림 기능은 어떻게 구현하는게 좋을까?
This is a wannabe backend developer’s dev blog.
taemham.github.io
[백엔드 스프링부트] 알림 기능은 어떻게 구현하는게 좋을까?
This is a wannabe backend developer’s dev blog.
taemham.github.io