This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 4698b7a63 [FLINK-38550][connect/mongodb] Allow recovery from more
change stream expiration scenarios (#4161)
4698b7a63 is described below
commit 4698b7a6397fafbf3e54d81ce7c29ad1f62d7d16
Author: Sachin Mittal <[email protected]>
AuthorDate: Tue Oct 28 14:25:51 2025 +0530
[FLINK-38550][connect/mongodb] Allow recovery from more change stream
expiration scenarios (#4161)
---
.../cdc/connectors/mongodb/source/utils/MongoUtils.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
index 949030667..318065072 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
@@ -79,6 +79,8 @@ public class MongoUtils {
public static final int FAILED_TO_PARSE_ERROR = 9;
public static final int UNAUTHORIZED_ERROR = 13;
public static final int ILLEGAL_OPERATION_ERROR = 20;
+ public static final int CLOSE_CHANGE_STREAM = 222;
+ public static final int RETRY_CHANGE_STREAM = 234;
public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
public static final int CHANGE_STREAM_FATAL_ERROR = 280;
public static final int CHANGE_STREAM_HISTORY_LOST = 286;
@@ -88,6 +90,8 @@ public class MongoUtils {
private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
new HashSet<>(
asList(
+ CLOSE_CHANGE_STREAM,
+ RETRY_CHANGE_STREAM,
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
@@ -96,7 +100,6 @@ public class MongoUtils {
private static final String RESUME_TOKEN = "resume token";
private static final String NOT_FOUND = "not found";
private static final String DOES_NOT_EXIST = "does not exist";
- private static final String INVALID_RESUME_TOKEN = "invalid resume token";
private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the
oplog";
private MongoUtils() {}
@@ -455,14 +458,13 @@ public class MongoUtils {
// This check is stricter than checkIfChangeStreamCursorExpires, which
specifically
// checks if given exception is caused by an expired resume token.
public static boolean checkIfResumeTokenExpires(final
MongoCommandException e) {
- if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) {
+ if (e.getCode() != CHANGE_STREAM_FATAL_ERROR && e.getCode() !=
CHANGE_STREAM_HISTORY_LOST) {
return false;
}
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
- return (errorMessage.contains(RESUME_TOKEN))
- && (errorMessage.contains(NOT_FOUND)
- || errorMessage.contains(DOES_NOT_EXIST)
- || errorMessage.contains(INVALID_RESUME_TOKEN)
- || errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
+ return errorMessage.contains(RESUME_TOKEN)
+ || errorMessage.contains(NOT_FOUND)
+ || errorMessage.contains(DOES_NOT_EXIST)
+ || errorMessage.contains(NO_LONGER_IN_THE_OPLOG);
}
}