[GitHub] [flink-connector-mongodb] zentol commented on a diff in pull request #3: [FLINK-31063] Prevent duplicate reading when restoring from a checkpoint.

2023-02-16 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r1105623716


##
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java:
##
@@ -76,6 +76,7 @@ private static MongoScanSourceSplit createSourceSplit(int 
index) {
 "coll",
 new BsonDocument("_id", new BsonInt32(index)),
 new BsonDocument("_id", MongoConstants.BSON_MAX_KEY),
-ID_HINT);
+ID_HINT,
+index);

Review Comment:
   This change shouldnt be necessary.



##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.bson.BsonDocument;
+
 /** MongoDB source split state. */
 @PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {

Review Comment:
   I don't necessarily disagree with this change, just curious what brought it 
on. It doesn't seem to be required for fixing the issue at hand.



##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java:
##
@@ -181,6 +181,11 @@ private MongoCursor getOrCreateCursor() {
 .hint(currentSplit.getHint())
 .noCursorTimeout(readOptions.isNoCursorTimeout());
 
+// Current split is partial read and recovered from checkpoint.

Review Comment:
   ```suggestion
   // Current split was partially read and recovered from checkpoint
   ```



##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.bson.BsonDocument;
+
 /** MongoDB source split state. */
 @PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {
 
-private final MongoSourceSplit split;
+protected final MongoSourceSplit split;
 
 public MongoSourceSplitState(MongoSourceSplit split) {
 this.split = split;
 }
 
-public MongoSourceSplit toMongoSourceSplit() {
-if (split instanceof MongoScanSourceSplit) {
-MongoScanSourceSplit scanSplit = (MongoScanSourceSplit) split;
-return new MongoScanSourceSplit(
-scanSplit.splitId(),
-scanSplit.getDatabase(),
-scanSplit.getCollection(),
-scanSplit.getMin(),
-scanSplit.getMax(),
-scanSplit.getHint());
-} else {
-throw new IllegalStateException("Unknown split type");
-}
-}
+public abstract MongoSourceSplit toMongoSourceSplit();
+
+public abstract void updateOffset(BsonDocument record);

Review Comment:
   Not sure why we pass in the record; we're just incrementing the count. Will 
this be required in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] zentol commented on a diff in pull request #3: [FLINK-31063] Prevent duplicate reading when restoring from a checkpoint.

2023-02-17 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r1109995851


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import org.bson.BsonDocument;
+
 /** MongoDB source split state. */
 @PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {

Review Comment:
   Maybe consider introducing a `MongoSourceSplitState` interface instead. We 
have then more freedom to change the internals than when we make the 
implementation itself part of the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-mongodb] zentol commented on a diff in pull request #3: [FLINK-31063] Prevent duplicate reading when restoring from a checkpoint.

2023-02-20 Thread via GitHub


zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r691646


##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoScanSourceSplitState.java:
##
@@ -23,18 +23,19 @@
 
 /** MongoDB source split state for {@link MongoScanSourceSplit}. */
 @PublicEvolving

Review Comment:
   this can now be `@Internal`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org