gguptp commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1726819861


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java:
##########
@@ -29,15 +29,19 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /** Used to serialize and deserialize the {@link 
DynamoDbStreamsSourceEnumeratorState}. */
 @Internal
 public class DynamoDbStreamsSourceEnumeratorStateSerializer
         implements 
SimpleVersionedSerializer<DynamoDbStreamsSourceEnumeratorState> {
 
-    private static final int CURRENT_VERSION = 0;
+    private static final Set<Integer> COMPATIBLE_VERSIONS = new 
HashSet<>(Arrays.asList(0, 1));

Review Comment:
   makes sense, we can keep it as 0



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+        if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in DescribeStream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + 
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+            return;
+        }
+
+        splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+        splitTracker.removeSplits(
+                splitGraphInconsistencyTracker.getNodes().stream()
+                        .map(Shard::shardId)
+                        .collect(Collectors.toSet()));
+        if (context.registeredReaders().size() < context.currentParallelism()) 
{
+            LOG.info(
+                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, registered readers: {}",
+                    context.currentParallelism(),
+                    context.registeredReaders().size());
             return;
         }
+        assignSplits();
+    }
 
-        for (DynamoDbStreamsShardSplit split : splits) {
-            splitAssignment.get(subtaskId).remove(split);
-            assignedSplitIds.remove(split.splitId());
-            unassignedSplits.add(split);
+    /**
+     * This method tracks the discovered splits in a graph and if the graph 
has inconsistencies, it
+     * tries to resolve them using DescribeStream calls using the first 
inconsistent node found in
+     * the split graph.
+     *
+     * @param discoveredSplits splits discovered after calling DescribeStream 
at the start of the
+     *     application or periodically.
+     */
+    private SplitGraphInconsistencyTracker 
trackSplitsAndResolveInconsistencies(
+            ListShardsResult discoveredSplits) {
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                new SplitGraphInconsistencyTracker();
+        splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+        for (int i = 0;
+                i < describeStreamInconsistencyResolutionCount
+                        && !streamDisabled

Review Comment:
   The reason is for describestream there will never be any open child shard, 
so it would always fall under inconsistency.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+        if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in DescribeStream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + 
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+            return;
+        }
+
+        splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+        splitTracker.removeSplits(
+                splitGraphInconsistencyTracker.getNodes().stream()
+                        .map(Shard::shardId)
+                        .collect(Collectors.toSet()));
+        if (context.registeredReaders().size() < context.currentParallelism()) 
{
+            LOG.info(
+                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, registered readers: {}",
+                    context.currentParallelism(),
+                    context.registeredReaders().size());
             return;
         }
+        assignSplits();
+    }
 
-        for (DynamoDbStreamsShardSplit split : splits) {
-            splitAssignment.get(subtaskId).remove(split);
-            assignedSplitIds.remove(split.splitId());
-            unassignedSplits.add(split);
+    /**
+     * This method tracks the discovered splits in a graph and if the graph 
has inconsistencies, it
+     * tries to resolve them using DescribeStream calls using the first 
inconsistent node found in
+     * the split graph.
+     *
+     * @param discoveredSplits splits discovered after calling DescribeStream 
at the start of the
+     *     application or periodically.
+     */
+    private SplitGraphInconsistencyTracker 
trackSplitsAndResolveInconsistencies(
+            ListShardsResult discoveredSplits) {
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                new SplitGraphInconsistencyTracker();
+        splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+        for (int i = 0;
+                i < describeStreamInconsistencyResolutionCount
+                        && !streamDisabled
+                        && 
splitGraphInconsistencyTracker.inconsistencyDetected();
+                i++) {
+            ListShardsResult shardsToResolveInconsistencies =

Review Comment:
   updated with a LOG.warn



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java:
##########
@@ -203,6 +218,50 @@ private DynamoDbStreamsProxy 
createDynamoDbStreamsProxy(Configuration consumerCo
         return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient);
     }
 
+    private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries(
+            Configuration consumerConfig) {
+        SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        AttributeMap.builder().build(), 
ApacheHttpClient.builder());
+
+        Properties dynamoDbStreamsClientProperties = new Properties();
+        String region =
+                AWSGeneralUtil.getRegionFromArn(streamArn)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Unable to determine region 
from stream arn"));
+        dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, 
region);
+        consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
+
+        AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
+        int maxDescribeStreamCallAttempts =
+                sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT);
+        Duration minDescribeStreamDelay = 
sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN);
+        Duration maxDescribeStreamDelay = 
sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MAX);
+        BackoffStrategy backoffStrategy =
+                BackoffStrategy.exponentialDelay(minDescribeStreamDelay, 
maxDescribeStreamDelay);
+        AdaptiveRetryStrategy adaptiveRetryStrategy =
+                AdaptiveRetryStrategy.builder()
+                        .maxAttempts(maxDescribeStreamCallAttempts)
+                        .backoffStrategy(backoffStrategy)
+                        .throttlingBackoffStrategy(backoffStrategy)
+                        .build();

Review Comment:
   https://issues.apache.org/jira/browse/FLINK-36195
   
   Added jira for same



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();

Review Comment:
   We are not doing incremental shard discovery and not putting finishedSplits 
causes reprocessing



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                    if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+                        finishedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param shardsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<Shard> shardsToAdd) {
+        List<DynamoDbStreamsShardSplit> newSplitsToAdd =
+                determineNewShardsToBePutForAssignment(shardsToAdd);
+        newSplitsToAdd.forEach(
+                split -> {
+                    knownSplits.put(split.splitId(), split);
+                });
+    }
+
+    private List<Shard> getOpenShards(Collection<Shard> shards) {
+        List<Shard> openShards = new ArrayList<>();
+        for (Shard shard : shards) {
+            if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
+                openShards.add(shard);
+            }
+        }
+        return openShards;
+    }
+
+    private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) {
+        Map<String, Shard> shardIdToShardMap = new HashMap<>();
+        for (Shard shard : shards) {
+            shardIdToShardMap.put(shard.shardId(), shard);
+        }
+        return shardIdToShardMap;
+    }
+
+    /**
+     * This function finds the open shards returned from describeStream 
operation and adds them
+     * along with their parents if parents are not already tracked to the 
tracked splits
+     *
+     * <p>This is needed because describestream has an inconsistency where for 
example if a shard s
+     * is split into s1 and s2, in one describestream operation, its possible 
that only one of s1
+     * and s2 is returned.
+     *
+     * <p>We will go up the shard lineage until we find a parent shard which 
is not yet tracked by
+     * SplitTracker If no ancestor is tracked, the first ancestor will be read 
from the initial
+     * position configured and all its descendants will be read from 
TRIM_HORIZON

Review Comment:
   after modifying the SplitTracker algo, this function has been removed.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }

Review Comment:
   added a UT for this 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -62,7 +61,7 @@ public DynamoDbStreamsProxy(
     }
 
     @Override
-    public List<Shard> listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+    public ListShardsResult listShards(String streamArn, @Nullable String 
lastSeenShardId) {

Review Comment:
   we use the lastSeenShardId in the ShardGraphInconsistencyTracker



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java:
##########
@@ -203,6 +218,50 @@ private DynamoDbStreamsProxy 
createDynamoDbStreamsProxy(Configuration consumerCo
         return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient);
     }
 
+    private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries(
+            Configuration consumerConfig) {
+        SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        AttributeMap.builder().build(), 
ApacheHttpClient.builder());
+
+        Properties dynamoDbStreamsClientProperties = new Properties();
+        String region =
+                AWSGeneralUtil.getRegionFromArn(streamArn)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Unable to determine region 
from stream arn"));
+        dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, 
region);
+        consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
+
+        AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
+        int maxDescribeStreamCallAttempts =
+                sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT);
+        Duration minDescribeStreamDelay = 
sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN);
+        Duration maxDescribeStreamDelay = 
sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MAX);
+        BackoffStrategy backoffStrategy =
+                BackoffStrategy.exponentialDelay(minDescribeStreamDelay, 
maxDescribeStreamDelay);
+        AdaptiveRetryStrategy adaptiveRetryStrategy =
+                AdaptiveRetryStrategy.builder()
+                        .maxAttempts(maxDescribeStreamCallAttempts)
+                        .backoffStrategy(backoffStrategy)
+                        .throttlingBackoffStrategy(backoffStrategy)
+                        .build();

Review Comment:
   added jira here: https://issues.apache.org/jira/browse/FLINK-36195



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);

Review Comment:
   abstracted it out to a private method to improve readability here



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -43,6 +45,15 @@ public enum InitialPosition {
                     .defaultValue(10000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final Duration MIN_DDB_STREAMS_SHARD_RETENTION = 
Duration.ofHours(6);

Review Comment:
   removed this



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,39 @@ public enum InitialPosition {
     public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
             ConfigOptions.key("flink.shard.discovery.intervalmillis")
                     .longType()
-                    .defaultValue(10000L)
+                    .defaultValue(60000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT 
=
+            ConfigOptions.key("flink.describestream.numretries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry describestream call 
if it returns a retriable exception");
+
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN =
+            ConfigOptions.key("flink.describestream.backoff.mindelay")
+                    .intType()

Review Comment:
   changed it to duration type



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * Class to track the state of shard graph created as part of describestream 
operation. This will
+ * track for inconsistent shards returned due to describestream operation. 
Caller will have to call
+ * describestream again with the chronologically first leaf node to resolve 
inconsistencies.
+ */
+public class SplitGraphInconsistencyTracker {
+    private final TreeSet<String> closedLeafNodes;
+    private final Map<String, Shard> nodes;
+
+    public SplitGraphInconsistencyTracker() {
+        nodes = new HashMap<>();
+        closedLeafNodes = new TreeSet<>();
+    }
+
+    public void addNodes(List<Shard> shards) {
+        for (Shard shard : shards) {
+            addNode(shard);
+        }
+    }
+
+    private void addNode(Shard shard) {
+        nodes.put(shard.shardId(), shard);
+        if (shard.sequenceNumberRange().endingSequenceNumber() != null) {
+            closedLeafNodes.add(shard.shardId());
+        }
+        if (shard.parentShardId() != null) {
+            closedLeafNodes.remove(shard.parentShardId());
+        }

Review Comment:
   The return order is a guarantee of DDBStreams. Streams KCL adapter also 
takes this dependency.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java:
##########
@@ -203,6 +218,50 @@ private DynamoDbStreamsProxy 
createDynamoDbStreamsProxy(Configuration consumerCo
         return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient);
     }
 
+    private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries(
+            Configuration consumerConfig) {
+        SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        AttributeMap.builder().build(), 
ApacheHttpClient.builder());
+
+        Properties dynamoDbStreamsClientProperties = new Properties();
+        String region =
+                AWSGeneralUtil.getRegionFromArn(streamArn)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Unable to determine region 
from stream arn"));
+        dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, 
region);
+        consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
+
+        AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
+        int maxDescribeStreamCallAttempts =
+                sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT);

Review Comment:
   updated



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+        if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in DescribeStream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + 
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+            return;
+        }
+
+        splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+        splitTracker.removeSplits(
+                splitGraphInconsistencyTracker.getNodes().stream()
+                        .map(Shard::shardId)
+                        .collect(Collectors.toSet()));

Review Comment:
   the removeSplits function removes the splits which are finished and have 
been created more than 24 hours ago.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -162,76 +237,16 @@ private List<DynamoDbStreamsShardSplit> 
initialDiscoverSplits() {
      *
      * @return list of discovered splits
      */
-    private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() {
-        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
-
-        // Any shard discovered after the initial startup should be read from 
the start, since they
-        // come from resharding
-        return mapToSplits(shards, InitialPosition.TRIM_HORIZON);
-    }
-
-    private List<DynamoDbStreamsShardSplit> mapToSplits(
-            List<Shard> shards, InitialPosition initialPosition) {
-        StartingPosition startingPosition;
-        switch (initialPosition) {
-            case LATEST:
-                startingPosition = StartingPosition.latest();
-                break;
-            case TRIM_HORIZON:
-            default:
-                startingPosition = StartingPosition.fromStart();
-        }
-
-        List<DynamoDbStreamsShardSplit> splits = new ArrayList<>();
-        for (Shard shard : shards) {
-            splits.add(new DynamoDbStreamsShardSplit(streamArn, 
shard.shardId(), startingPosition));
-        }
-
-        return splits;
-    }
-
-    /**
-     * This method assigns a given set of DynamoDb Streams splits to the 
readers currently
-     * registered on the cluster. This assignment is done via a side-effect on 
the {@link
-     * SplitEnumeratorContext} object.
-     *
-     * @param discoveredSplits list of discovered splits
-     * @param throwable thrown when discovering splits. Will be null if no 
throwable thrown.
-     */
-    private void assignSplits(
-            List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable 
throwable) {
-        if (throwable != null) {
-            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
-        }
-
-        if (context.registeredReaders().size() < context.currentParallelism()) 
{
-            LOG.info(
-                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, Registered readers: {}",
-                    context.currentParallelism(),
-                    context.registeredReaders().size());
-            unassignedSplits.addAll(discoveredSplits);
-            return;
-        }
-
-        Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = 
new HashMap<>();
-        for (DynamoDbStreamsShardSplit split : unassignedSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-        unassignedSplits.clear();
-        for (DynamoDbStreamsShardSplit split : discoveredSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-
-        updateLastSeenShardId(discoveredSplits);
-        updateSplitAssignment(newSplitAssignments);
-        context.assignSplits(new SplitsAssignment<>(newSplitAssignments));
+    private ListShardsResult periodicallyDiscoverSplits() {

Review Comment:
   I combined both methods and renamed it to discoverSplits



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java:
##########
@@ -93,30 +86,25 @@ public DynamoDbStreamsSourceEnumeratorState deserialize(
                                 + ". Serializer version is "
                                 + getVersion());
             }
+            final int numKnownSplits = in.readInt();
+            final int splitSerializerVersion = in.readInt();
 
-            String lastSeenShardId = null;
-
-            final boolean hasLastSeenShardId = in.readBoolean();
-            if (hasLastSeenShardId) {
-                lastSeenShardId = in.readUTF();
-            }
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> knownSplits =
+                    new ArrayList<>(numKnownSplits);
 
-            final int numUnassignedSplits = in.readInt();
-            final int splitSerializerVersion = in.readInt();
-            if (splitSerializerVersion != splitSerializer.getVersion()) {
-                throw new VersionMismatchException(
-                        "Trying to deserialize DynamoDbStreamsShardSplit 
serialized with unsupported version "
-                                + splitSerializerVersion
-                                + ". Serializer version is "
-                                + splitSerializer.getVersion());
-            }
-            Set<DynamoDbStreamsShardSplit> unassignedSplits = new 
HashSet<>(numUnassignedSplits);
-            for (int i = 0; i < numUnassignedSplits; i++) {
+            for (int i = 0; i < numKnownSplits; i++) {
                 int serializedLength = in.readInt();
                 byte[] serializedSplit = new byte[serializedLength];
                 if (in.read(serializedSplit) != -1) {
-                    unassignedSplits.add(
-                            
splitSerializer.deserialize(splitSerializerVersion, serializedSplit));
+                    DynamoDbStreamsShardSplit deserializedSplit =
+                            
splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
+                    SplitAssignmentStatus assignmentStatus = 
SplitAssignmentStatus.UNASSIGNED;
+                    if (version == CURRENT_VERSION) {
+                        assignmentStatus = 
SplitAssignmentStatus.fromStatusCode(in.readInt());
+                    }

Review Comment:
   makes sense. Removed this



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
+                trackSplitsAndResolveInconsistencies(discoveredSplits);
+
+        if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in DescribeStream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + 
splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
+            return;
+        }
+
+        splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes());
+        splitTracker.removeSplits(
+                splitGraphInconsistencyTracker.getNodes().stream()
+                        .map(Shard::shardId)
+                        .collect(Collectors.toSet()));
+        if (context.registeredReaders().size() < context.currentParallelism()) 
{
+            LOG.info(
+                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, registered readers: {}",
+                    context.currentParallelism(),
+                    context.registeredReaders().size());
             return;
         }
+        assignSplits();
+    }
 
-        for (DynamoDbStreamsShardSplit split : splits) {
-            splitAssignment.get(subtaskId).remove(split);
-            assignedSplitIds.remove(split.splitId());
-            unassignedSplits.add(split);
+    /**
+     * This method tracks the discovered splits in a graph and if the graph 
has inconsistencies, it
+     * tries to resolve them using DescribeStream calls using the first 
inconsistent node found in
+     * the split graph.
+     *
+     * @param discoveredSplits splits discovered after calling DescribeStream 
at the start of the
+     *     application or periodically.
+     */
+    private SplitGraphInconsistencyTracker 
trackSplitsAndResolveInconsistencies(
+            ListShardsResult discoveredSplits) {

Review Comment:
   I made some optimizations in the initial discovery thread so that it will 
not run if theres already a state maintained in flink state. I thought about 
removing this inconsistency check but the problem is that the inconsistencies 
happen more for large streams. Given for large streams, describestream itself 
would take quite a long time, and if due to inconsistency, splits don't get put 
in SplitTracker, it would mean essentially double the time to fetch the splits 
and do some work



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                    if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+                        finishedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param shardsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<Shard> shardsToAdd) {
+        List<DynamoDbStreamsShardSplit> newSplitsToAdd =
+                determineNewShardsToBePutForAssignment(shardsToAdd);
+        newSplitsToAdd.forEach(
+                split -> {
+                    knownSplits.put(split.splitId(), split);
+                });
+    }
+
+    private List<Shard> getOpenShards(Collection<Shard> shards) {
+        List<Shard> openShards = new ArrayList<>();
+        for (Shard shard : shards) {
+            if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
+                openShards.add(shard);
+            }
+        }
+        return openShards;
+    }
+
+    private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) {
+        Map<String, Shard> shardIdToShardMap = new HashMap<>();
+        for (Shard shard : shards) {
+            shardIdToShardMap.put(shard.shardId(), shard);
+        }
+        return shardIdToShardMap;
+    }
+
+    /**
+     * This function finds the open shards returned from describeStream 
operation and adds them
+     * along with their parents if parents are not already tracked to the 
tracked splits
+     *
+     * <p>This is needed because describestream has an inconsistency where for 
example if a shard s
+     * is split into s1 and s2, in one describestream operation, its possible 
that only one of s1
+     * and s2 is returned.
+     *
+     * <p>We will go up the shard lineage until we find a parent shard which 
is not yet tracked by
+     * SplitTracker If no ancestor is tracked, the first ancestor will be read 
from the initial
+     * position configured and all its descendants will be read from 
TRIM_HORIZON
+     *
+     * @param shards the shards returned from DescribeStream operation
+     * @return list of {@link DynamoDbStreamsShardSplit} which will be put to 
tracked splits
+     */
+    private List<DynamoDbStreamsShardSplit> 
determineNewShardsToBePutForAssignment(

Review Comment:
   made the algo simpler



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */

Review Comment:
   added a comment.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/metrics/DynamoDbStreamsShardMetrics.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.metrics;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.arns.Arn;
+
+/** A utility class for handling dynamodb streams shard metrics. */
+@Internal
+public class DynamoDbStreamsShardMetrics {
+    private static final Logger log = 
LoggerFactory.getLogger(DynamoDbStreamsShardMetrics.class);
+    private final MetricGroup metricGroup;
+    private final DynamoDbStreamsShardSplit shardInfo;
+    private volatile long millisBehindLatest = -1;
+
+    public DynamoDbStreamsShardMetrics(
+            DynamoDbStreamsShardSplit shardInfo, MetricGroup rootMetricGroup) {
+        this.shardInfo = shardInfo;
+        Arn streamArn = Arn.fromString(shardInfo.getStreamArn());
+        this.metricGroup =
+                rootMetricGroup
+                        
.addGroup(MetricConstants.DYNAMODB_STREAMS_SOURCE_METRIC_GROUP)
+                        
.addGroup(MetricConstants.KINEIS_ANALYTICS_METRIC_GROUP)

Review Comment:
   removed this from the code. I think we need to think a better way on how to 
show the metrics to customer's cloudwatch or atleast give some way of having 
p100 of all millisBehindLatest



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -174,6 +184,13 @@ private String getShardIterator(
                     shardId,
                     streamArn);
             return null;
+        } catch (TrimmedDataAccessException e) {
+            LOG.info(
+                    "Received TrimmedDataAccessException. "
+                            + "Shard {} of stream {} is no longer valid, 
marking it as complete.",
+                    shardId,
+                    streamArn);
+            return null;

Review Comment:
   This is just the API behaviour. TrimmedDataAccessException for 
GetShardIterator means that the shard has been expired. For GetRecords, it 
essentially means that the shard has been trimmed midway. We might still be 
able to read from shard if GetRecords returns this exception, but not from 
GetShardIterator



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##########
@@ -93,6 +92,13 @@ public GetRecordsResponse getRecords(
                 shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
             }
             return getRecordsResponse;
+        } catch (TrimmedDataAccessException e) {
+            shardIterator = getShardIterator(streamArn, shardId, 
StartingPosition.fromStart());
+            GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
+            if (getRecordsResponse.nextShardIterator() != null) {
+                shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
+            }
+            return getRecordsResponse;

Review Comment:
   added comment to explain this



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTrackerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShard;
+import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the {@link SplitGraphInconsistencyTracker} class to verify that it 
correctly discovers
+ * inconsistencies within the shard graph.
+ */
+public class ShardGraphTrackerTest {
+    @Test
+    public void testShardGraphTrackerHappyCase() {
+        List<Shard> shards =
+                Arrays.asList(
+                        // shards which don't have a parent
+                        generateShard(0, "1400", "1700", null),
+                        generateShard(1, "1500", "1800", null),
+                        // shards produced by rotation of parents
+                        generateShard(2, "1710", null, generateShardId(0)),
+                        generateShard(3, "1520", null, generateShardId(1)));
+        SplitGraphInconsistencyTracker shardGraphTracker = new 
SplitGraphInconsistencyTracker();
+        shardGraphTracker.addNodes(shards);
+        assertThat(shardGraphTracker.inconsistencyDetected()).isFalse();
+        
assertThat(shardGraphTracker.getNodes()).containsExactlyInAnyOrderElementsOf(shards);

Review Comment:
   current UTs should already be testing this



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();

Review Comment:
   correct, not having finishedSplits leads to reprocessing because we're doing 
a full shard discovery every time



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java:
##########
@@ -56,17 +58,19 @@ public static class TestDynamoDbStreamsProxy implements 
StreamProxy {
 
         // List shards configuration
         private final List<Shard> shards = new ArrayList<>();
+        private final Instant creationTimestamp = Instant.now();
         private Supplier<Exception> listShardsExceptionSupplier;
-        private boolean shouldRespectLastSeenShardId = true;
+        private boolean shouldRespectLastSeenShardId = false;

Review Comment:
   yeah, i don't think its needed. Removed this.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                    if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+                        finishedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param shardsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<Shard> shardsToAdd) {
+        List<DynamoDbStreamsShardSplit> newSplitsToAdd =
+                determineNewShardsToBePutForAssignment(shardsToAdd);
+        newSplitsToAdd.forEach(
+                split -> {
+                    knownSplits.put(split.splitId(), split);
+                });
+    }
+
+    private List<Shard> getOpenShards(Collection<Shard> shards) {
+        List<Shard> openShards = new ArrayList<>();
+        for (Shard shard : shards) {
+            if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
+                openShards.add(shard);
+            }
+        }
+        return openShards;
+    }
+
+    private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) {
+        Map<String, Shard> shardIdToShardMap = new HashMap<>();
+        for (Shard shard : shards) {
+            shardIdToShardMap.put(shard.shardId(), shard);
+        }
+        return shardIdToShardMap;
+    }
+
+    /**
+     * This function finds the open shards returned from describeStream 
operation and adds them
+     * along with their parents if parents are not already tracked to the 
tracked splits
+     *
+     * <p>This is needed because describestream has an inconsistency where for 
example if a shard s
+     * is split into s1 and s2, in one describestream operation, its possible 
that only one of s1
+     * and s2 is returned.
+     *
+     * <p>We will go up the shard lineage until we find a parent shard which 
is not yet tracked by
+     * SplitTracker If no ancestor is tracked, the first ancestor will be read 
from the initial
+     * position configured and all its descendants will be read from 
TRIM_HORIZON
+     *
+     * @param shards the shards returned from DescribeStream operation
+     * @return list of {@link DynamoDbStreamsShardSplit} which will be put to 
tracked splits
+     */
+    private List<DynamoDbStreamsShardSplit> 
determineNewShardsToBePutForAssignment(
+            Collection<Shard> shards) {
+        Map<String, Shard> shardIdToShardMap = getShardIdToShardMap(shards);
+        List<Shard> openShards = getOpenShards(shards);
+        List<DynamoDbStreamsShardSplit> newSplitsToBeTracked = new 
ArrayList<>();
+        Map<String, Boolean> memoizationContext = new HashMap<>();
+        for (Shard openShard : openShards) {
+            String shardId = openShard.shardId();
+            if (!knownSplits.containsKey(shardId)) {
+                boolean isDescendant =
+                        checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+                                openShard.shardId(),
+                                shardIdToShardMap,
+                                newSplitsToBeTracked,
+                                memoizationContext);
+                if (isDescendant) {
+                    newSplitsToBeTracked.add(mapToSplit(openShard, 
TRIM_HORIZON));
+                } else {
+                    newSplitsToBeTracked.add(mapToSplit(openShard, 
initialPosition));
+                }
+            }
+        }
+        return newSplitsToBeTracked;
+    }
+
+    /**
+     * Check if any ancestor shard of the current shard has not been tracked 
yet. Take this example:
+     * 0->3->8, 0->4->9, 1->5, 1->6, 2->7
+     *
+     * <p>At epoch 1, the lineage looked like this due to describestream 
inconsistency 0->3, 1->5,
+     * 1->6, 2->7 knownSplits = 0,1,2,3,5,6,7 After a few describestream 
calls, at epoch 2, after
+     * the whole lineage got discovered, since 4 was not tracked, we should 
start tracking 4 also.
+     * knownSplits = 0,1,2,3,4,5,6,7,8,9.
+     */
+    private boolean checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+            String shardId,
+            Map<String, Shard> shardIdToShardMap,
+            List<DynamoDbStreamsShardSplit> newSplitsToBeTracked,
+            Map<String, Boolean> memoizationContext) {
+        Boolean previousValue = memoizationContext.get(shardId);
+        if (previousValue != null) {
+            return previousValue;
+        }
+
+        if (shardId != null && shardIdToShardMap.containsKey(shardId)) {
+            if (knownSplits.containsKey(shardId)) {
+                return true;
+            } else {
+                Shard shard = shardIdToShardMap.get(shardId);
+                String parentShardId = shard.parentShardId();
+                boolean isParentShardDescendant =
+                        checkIfShardIsDescendantAndAddAncestorsToBeTracked(
+                                parentShardId,
+                                shardIdToShardMap,
+                                newSplitsToBeTracked,
+                                memoizationContext);
+                if (shardIdToShardMap.containsKey(parentShardId)) {
+                    if (!knownSplits.containsKey(parentShardId)) {
+                        Shard parentShard = 
shardIdToShardMap.get(parentShardId);
+                        if (isParentShardDescendant) {
+                            newSplitsToBeTracked.add(mapToSplit(parentShard, 
TRIM_HORIZON));
+                        } else {
+                            newSplitsToBeTracked.add(mapToSplit(parentShard, 
initialPosition));
+                        }
+                        return true;
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    private DynamoDbStreamsShardSplit mapToSplit(
+            Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        StartingPosition startingPosition;
+        switch (initialPosition) {
+            case LATEST:
+                startingPosition = StartingPosition.latest();
+                break;
+            case TRIM_HORIZON:
+            default:
+                startingPosition = StartingPosition.fromStart();
+        }
+
+        Set<String> parentShardIds = new HashSet<>();
+        if (shard.parentShardId() != null) {
+            parentShardIds.add(shard.parentShardId());
+        }
+        return new DynamoDbStreamsShardSplit(
+                streamArn, shard.shardId(), startingPosition, parentShardIds);
+    }
+
+    /**
+     * Mark splits as assigned. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToAssign collection of splits to mark as assigned
+     */
+    public void markAsAssigned(Collection<DynamoDbStreamsShardSplit> 
splitsToAssign) {
+        splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+    }
+
+    /**
+     * Mark splits as finished. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToFinish collection of splits to mark as assigned
+     */
+    public void markAsFinished(Collection<String> splitsToFinish) {
+        splitsToFinish.forEach(
+                splitId -> {
+                    finishedSplits.add(splitId);
+                    assignedSplits.remove(splitId);
+                });
+    }
+
+    public boolean isAssigned(String splitId) {
+        return assignedSplits.contains(splitId);
+    }
+
+    /**
+     * Since we never put an inconsistent shard lineage to splitTracker, so if 
a shard's parent is
+     * not there, that means that that should already be cleaned up.
+     */
+    public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
+        return knownSplits.values().stream()
+                .filter(
+                        split -> {
+                            boolean splitIsNotAssigned = 
!isAssigned(split.splitId());
+                            return splitIsNotAssigned
+                                    && !isFinished(split.splitId())
+                                    && (verifyAllParentSplitsAreFinished(split)
+                                            || 
verifyAllParentSplitsAreCleanedUp(split));

Review Comment:
   pushed down to a separate function



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTrackerTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShard;
+import static 
org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the {@link SplitGraphInconsistencyTracker} class to verify that it 
correctly discovers
+ * inconsistencies within the shard graph.
+ */
+public class ShardGraphTrackerTest {
+    @Test
+    public void testShardGraphTrackerHappyCase() {
+        List<Shard> shards =
+                Arrays.asList(
+                        // shards which don't have a parent
+                        generateShard(0, "1400", "1700", null),
+                        generateShard(1, "1500", "1800", null),
+                        // shards produced by rotation of parents
+                        generateShard(2, "1710", null, generateShardId(0)),
+                        generateShard(3, "1520", null, generateShardId(1)));
+        SplitGraphInconsistencyTracker shardGraphTracker = new 
SplitGraphInconsistencyTracker();
+        shardGraphTracker.addNodes(shards);
+        assertThat(shardGraphTracker.inconsistencyDetected()).isFalse();
+        
assertThat(shardGraphTracker.getNodes()).containsExactlyInAnyOrderElementsOf(shards);
+    }
+
+    @Test
+    public void testShardGraphTrackerDetectsInconsistencies() {
+        List<Shard> shards =
+                Arrays.asList(
+                        // shards which don't have a parent
+                        generateShard(0, "1400", "1700", null),
+                        generateShard(1, "1500", "1800", null),
+                        // shards produced by rotation of parents
+                        generateShard(2, "1710", null, generateShardId(0)));

Review Comment:
   should be covered in the UTs now



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,41 @@ public enum InitialPosition {
     public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
             ConfigOptions.key("flink.shard.discovery.intervalmillis")
                     .longType()
-                    .defaultValue(10000L)
+                    .defaultValue(60000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");

Review Comment:
   hmm, it seems the callAsync method of enumerator takes long as the 
parameter. So after making it duration, we'd again have to convert it to long, 
so prefer not making it duration type



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java:
##########
@@ -93,8 +100,19 @@ public DynamoDbStreamsShardSplit deserialize(int version, 
byte[] serialized)
                 }
             }
 
+            Set<String> parentShardIds = new HashSet<>();
+            if (version == CURRENT_VERSION) {

Review Comment:
   good catch. Removed this check



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -143,17 +217,18 @@ public void addReader(int subtaskId) {
 
     @Override
     public DynamoDbStreamsSourceEnumeratorState snapshotState(long 
checkpointId) throws Exception {

Review Comment:
   added UT



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                    if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+                        finishedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param shardsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<Shard> shardsToAdd) {
+        Map<String, Shard> shardIdToShardMap = 
getShardIdToShardMap(shardsToAdd);
+
+        for (Shard shard : shardsToAdd) {
+            String shardId = shard.shardId();
+            if (!knownSplits.containsKey(shardId)) {
+                DynamoDbStreamsShardSplit newSplit =
+                        mapToSplit(shard, getStartingPosition(shard, 
shardIdToShardMap));
+                knownSplits.put(shardId, newSplit);
+            }
+        }
+    }
+
+    private InitialPosition getStartingPosition(Shard shard, Map<String, 
Shard> shardIdToShardMap) {
+        if (shard.parentShardId() == null) {
+            return initialPosition;
+        }
+        Shard parentShard = shardIdToShardMap.get(shard.parentShardId());
+        if (parentShard == null) {
+            return TRIM_HORIZON;
+        }
+        InitialPosition parentStartingPosition =
+                getStartingPosition(parentShard, shardIdToShardMap);
+        if (parentStartingPosition == initialPosition) {
+            return TRIM_HORIZON;
+        }
+        return parentStartingPosition;
+    }

Review Comment:
   this makes sense. I modified this algo to be as follows: if knownSplits 
don't contain the parent shard id of the current shard, we'll use the 
configured initial position, otherwise, we'll use TRIM_HORIZON. 
   
   Lets take the case of LATEST shard iterator:
   
   Lets take this lineage for example:
   ```
       1      3
       |      |
       2      4
   ```
   
   If our previous snapshot was this due to inconsistencies in describestream:
   ```
     1
     |
     2
   ```
   
   When 3 and 4 are returned, first 3 will be added to knownSplits and its 
initialPosition configured will be LATEST. After that, 4 will be added and 
since 3 is already added to knownSplits, so 4 startPosition will be 
TRIM_HORIZON 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to