WebFlux, MongoDB 의 CappedPositionLost, ChangeStream, Flux Merge

작성자 : 조회수 :

원하는 것

-       유저들이 경매 입찰 가격을 올릴 때(수정할 때) 실시간 값 변동을 SSE로 연결 되어 있는 모든 클라이언트 들에게 알림을 전송 하도록 함.

 

문제 해결 과정

 

1.     MongoDB Reactivetailable은 말 그대로 값의 추가(insert)는 감지해 이벤트 발생을 알려주지만 수정은 감지하지 못해 SSE 전달이 안됨

2.     그럼 약간의 꼼수를 부리고자 무식한 방법을 사용(아래 사진 첨부) 값의 수정이 필요할 때, 수정이 아닌 새로운 인자를 저장 후 기존에 있던 인자는 삭제하는 한번의 요청에 DB쿼리 두 번 하도록 함.

3.     CappedPositionLost 발생, insertudpate는 크게 부하가 되지 않는 것 같은데 delete문은 무한 스트림 MongoDB에는 큰 부하가 작용하는 것 같음, 다른 원인으로는 끝 값에 가 있는 cursor가 마지막 문이 삭제 됐을 때, cursor가 다시 마지막 요소를 찾아가는데 시간이 걸리는 듯 함

4.     그럼 insert가 아닌 update또한 인식 가능한 ChangeStream을 사용하면 되지만, standard MongoDB가 아닌 replicaSet이 된 MongoDB 여야함

5.     간단한 replicaSet을 마치고 ChangeStream을 사용했으나, 사실 ChangeStream은 처음 요청 일 때 insertupdate자체의 감지가 되지 않아서, 먼저 저장 되어 있는 모든 값들을 내가 insert하거나 update하지 않는 이상 값을 전달해주지 않음.

6.     그럼 Spring WebFlux 메서드중 하나인 Flux.merge(병합) 기능을 사용해 커서가 꼬리 즉, 마지막 값까지 도달하게 해주는 Tailable 쿼리 와 update, insert를 감지해주는 chageStream 쿼리 둘을 병합(Merge) 하도록 해서 구독을 반환, 프론트에 SSE로 전달.



2번의 무식한 방법





기존에 사용하는 방법



위 사진과 같은 MongoDB의 수정이 있었을 때 문제없이 값이 삭제와 Insert가 동시에 진행된다.

 

하지만 여기서 치명적인 문제점이 생긴다.

 

바로 연속적인 요청이 들어왔을 때 CappedPositionLost가 작동한다.

 

해당 에러는 cursor가 따라가지 못하고 위치를 잃었다. 아니면 Capped 용량 범위를 넘어서서

 

인 것 같지만일단 내가 보기엔 savedelete가 동시에 일어나면서 생기는 부하 문제인 것 같았다.

 

Save는 죄가 없다. 다만 delete를 하면 끝의 커서를 잃어버려 다시 찾는데 시간이 걸리는 거 같았다.

 

그래서 해결방법을 강구하기 시작했다.

 

 

찾아보니 change Stream을 사용하면 된다고 했는데

 

changeStream을 사용하려면 아래 사진들처럼 MongoDBreplica set 이 필요하다고 했다.

 

replicaSet은 사실 단순 changeStream만 사용하기 위해 만들어진 기술이 아니라

 

안전한 데이터 관리를 위해 만들어 진 것으로 보였다.

 

참고 (https://www.mongodb.com/docs/manual/replication/)

 

 

일단 나는 필요하니 세팅을 맞춰보도록 하겠다.


(필요한 사람들은 공식문서를 참고해주세요. 여러 포트를 열어 다양하게 관리하는 코드).



(이 코드도 마찬가지 입니다.)

 

 

나는 일단 위의 사진들 처럼 다중 replica 설정은 필요 없어서

 


위와 같이 간단한 initiate를 입력해주었다. ! 그전에 필수 설정이 있는데.




위 파일경로에서 mongod.cfg 파일에서



이렇게 replica 설정을 먼저 해준 뒤 initiate 해야한다.


본격적으로 코드에서 Change Stream을 사용해보자

위 코드들을 보면 두가지 구독이 있다.

 

하나는 Tailable 된 구독, 다른하나는 ChangeStream으로 된 구독.

 

다른 하나는 Change Stream형식의 구독 입니다.

 

Change StreamOption이 필요합니다.

 

중간 조건 필터를 넣어주고 반환하도록 했습니다.

 

최종적으로 병합 된 구독들은 값을 쭉 Select해주면서 동시에 Insert, Update, Replace 모두 감지하도록 했습니다.



추가 참고 정보입니다.



@Tailable 어노테이션을 사용하여 capped collection 테일링할 , 테일링 중에는 해당 컬렉션에 대해 일부 제한이 적용됩니다. 특히, @Tailable 사용하여 반환된 Flux 구독하는 경우 해당 Flux 데이터 변경을 모니터링하며, Flux 구독을 해제하지 않으면 해당 capped collection 소비되는 디스크 공간을 계속 차지하게 됩니다.

따라서 capped collection 테일링하고 있는 동안에는 해당 컬렉션에서 데이터를 삭제하거나 변경하는 것이 권장되지 않습니다. 특히, delete문을 사용하면 삭제된 데이터가 capped collection에서 제거되지 않고 소비되는 디스크 공간이 해제되지 않아 이상 새로운 데이터를 추가할 없는 CappedPositionLost 오류가 발생할 있습니다.

따라서 @Tailable 어노테이션을 사용하여 capped collection 테일링하는 경우, 데이터를 삭제하려면 삭제 대신 해당 데이터의 상태를 나타내는 필드를 추가하고 해당 필드를 업데이트하여 데이터를 이상 사용하지 않음을 표시하는 것이 좋습니다. 또한 capped collection 크기를 관리하고 필요하지 않은 데이터를 주기적으로 삭제하여 capped collection 크기를 유지하도록 구성하는 것이 좋습니다.

 

위의 글은 참고만 해주세요.


출처 :

 

https://www.mongodb.com/docs/manual/tutorial/convert-standalone-to-replica-set/