This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4698b7a63 [FLINK-38550][connect/mongodb] Allow recovery from more 
change stream expiration scenarios (#4161)
4698b7a63 is described below

commit 4698b7a6397fafbf3e54d81ce7c29ad1f62d7d16
Author: Sachin Mittal <[email protected]>
AuthorDate: Tue Oct 28 14:25:51 2025 +0530

    [FLINK-38550][connect/mongodb] Allow recovery from more change stream 
expiration scenarios (#4161)
---
 .../cdc/connectors/mongodb/source/utils/MongoUtils.java  | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
index 949030667..318065072 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
@@ -79,6 +79,8 @@ public class MongoUtils {
     public static final int FAILED_TO_PARSE_ERROR = 9;
     public static final int UNAUTHORIZED_ERROR = 13;
     public static final int ILLEGAL_OPERATION_ERROR = 20;
+    public static final int CLOSE_CHANGE_STREAM = 222;
+    public static final int RETRY_CHANGE_STREAM = 234;
     public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
     public static final int CHANGE_STREAM_FATAL_ERROR = 280;
     public static final int CHANGE_STREAM_HISTORY_LOST = 286;
@@ -88,6 +90,8 @@ public class MongoUtils {
     private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
             new HashSet<>(
                     asList(
+                            CLOSE_CHANGE_STREAM,
+                            RETRY_CHANGE_STREAM,
                             INVALIDATED_RESUME_TOKEN_ERROR,
                             CHANGE_STREAM_FATAL_ERROR,
                             CHANGE_STREAM_HISTORY_LOST,
@@ -96,7 +100,6 @@ public class MongoUtils {
     private static final String RESUME_TOKEN = "resume token";
     private static final String NOT_FOUND = "not found";
     private static final String DOES_NOT_EXIST = "does not exist";
-    private static final String INVALID_RESUME_TOKEN = "invalid resume token";
     private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the 
oplog";
 
     private MongoUtils() {}
@@ -455,14 +458,13 @@ public class MongoUtils {
     // This check is stricter than checkIfChangeStreamCursorExpires, which 
specifically
     // checks if given exception is caused by an expired resume token.
     public static boolean checkIfResumeTokenExpires(final 
MongoCommandException e) {
-        if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) {
+        if (e.getCode() != CHANGE_STREAM_FATAL_ERROR && e.getCode() != 
CHANGE_STREAM_HISTORY_LOST) {
             return false;
         }
         String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
-        return (errorMessage.contains(RESUME_TOKEN))
-                && (errorMessage.contains(NOT_FOUND)
-                        || errorMessage.contains(DOES_NOT_EXIST)
-                        || errorMessage.contains(INVALID_RESUME_TOKEN)
-                        || errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
+        return errorMessage.contains(RESUME_TOKEN)
+                || errorMessage.contains(NOT_FOUND)
+                || errorMessage.contains(DOES_NOT_EXIST)
+                || errorMessage.contains(NO_LONGER_IN_THE_OPLOG);
     }
 }

Reply via email to