gguptp commented on code in PR #151: URL: https://github.com/apache/flink-connector-aws/pull/151#discussion_r1741930933
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + trackSplitsAndResolveInconsistencies(discoveredSplits); + + if (splitGraphInconsistencyTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); + return; + } + + splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes()); + splitTracker.removeSplits( + splitGraphInconsistencyTracker.getNodes().stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); Review Comment: renamed it to `cleanUpOldFinishedSplits` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org