zentol commented on code in PR #3:
URL:
https://github.com/apache/flink-connector-mongodb/pull/3#discussion_r1105623716
##
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumStateSerializerTest.java:
##
@@ -76,6 +76,7 @@ private static MongoScanSourceSplit createSourceSplit(int
index) {
"coll",
new BsonDocument("_id", new BsonInt32(index)),
new BsonDocument("_id", MongoConstants.BSON_MAX_KEY),
-ID_HINT);
+ID_HINT,
+index);
Review Comment:
This change shouldnt be necessary.
##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
import org.apache.flink.annotation.PublicEvolving;
+import org.bson.BsonDocument;
+
/** MongoDB source split state. */
@PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {
Review Comment:
I don't necessarily disagree with this change, just curious what brought it
on. It doesn't seem to be required for fixing the issue at hand.
##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java:
##
@@ -181,6 +181,11 @@ private MongoCursor getOrCreateCursor() {
.hint(currentSplit.getHint())
.noCursorTimeout(readOptions.isNoCursorTimeout());
+// Current split is partial read and recovered from checkpoint.
Review Comment:
```suggestion
// Current split was partially read and recovered from checkpoint
```
##
flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/split/MongoSourceSplitState.java:
##
@@ -19,28 +19,19 @@
import org.apache.flink.annotation.PublicEvolving;
+import org.bson.BsonDocument;
+
/** MongoDB source split state. */
@PublicEvolving
-public class MongoSourceSplitState {
+public abstract class MongoSourceSplitState {
-private final MongoSourceSplit split;
+protected final MongoSourceSplit split;
public MongoSourceSplitState(MongoSourceSplit split) {
this.split = split;
}
-public MongoSourceSplit toMongoSourceSplit() {
-if (split instanceof MongoScanSourceSplit) {
-MongoScanSourceSplit scanSplit = (MongoScanSourceSplit) split;
-return new MongoScanSourceSplit(
-scanSplit.splitId(),
-scanSplit.getDatabase(),
-scanSplit.getCollection(),
-scanSplit.getMin(),
-scanSplit.getMax(),
-scanSplit.getHint());
-} else {
-throw new IllegalStateException("Unknown split type");
-}
-}
+public abstract MongoSourceSplit toMongoSourceSplit();
+
+public abstract void updateOffset(BsonDocument record);
Review Comment:
Not sure why we pass in the record; we're just incrementing the count. Will
this be required in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org