gguptp commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1741930933


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+        if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in DescribeStream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + 
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+            return;
+        }
+
+        splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+        splitTracker.removeSplits(
+                splitGraphInconsistencyTracker.getNodes().stream()
+                        .map(Shard::shardId)
+                        .collect(Collectors.toSet()));

Review Comment:
   renamed it to `cleanUpOldFinishedSplits`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to