This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch v6.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit e622064f4a1a33e996581e9bebfc44133eb74bc8 Author: Abhi Gupta <[email protected]> AuthorDate: Thu Dec 11 11:54:30 2025 +0530 [FLINK-36296] Add support for incremental shard discovery for DynamoDB Streams Source (cherry picked from commit 68acdbd69b9ba5beb00e26abf277b6ec5c660e92) --- .../connector/aws/util/AWSClientUtilTest.java | 6 +- .../sink/DynamoDbTypeInformedElementConverter.java | 5 +- .../dynamodb/source/DynamoDbStreamsSource.java | 45 ++++++----- .../DynamodbStreamsSourceConfigConstants.java | 4 + .../DynamoDbStreamsSourceEnumerator.java | 26 ++++-- .../enumerator/event/SplitsFinishedEvent.java | 15 ++-- .../event/SplitsFinishedEventContext.java | 52 ++++++++++++ .../source/enumerator/tracker/SplitTracker.java | 4 + .../source/proxy/DynamoDbStreamsProxy.java | 43 ++++++++++ .../dynamodb/source/proxy/StreamProxy.java | 10 +++ .../source/reader/DynamoDbStreamsSourceReader.java | 50 +++++++++--- .../PollingDynamoDbStreamsShardSplitReader.java | 92 +++++++++++++++++++--- .../source/split/DynamoDbStreamsShardSplit.java | 40 +++++++++- .../split/DynamoDbStreamsShardSplitSerializer.java | 49 +++++++++++- .../split/DynamoDbStreamsShardSplitState.java | 10 +++ .../DynamoDbStreamsSourceEnumeratorTest.java | 36 ++++++--- .../source/proxy/DynamoDbStreamsProxyTest.java | 86 +++++++++++++++++++- .../reader/DynamoDbStreamsSourceReaderTest.java | 16 +++- ...PollingDynamoDbStreamsShardSplitReaderTest.java | 16 +++- .../DynamoDbStreamsShardSplitSerializerTest.java | 14 ++++ .../source/util/DynamoDbStreamsClientProvider.java | 29 +++++++ .../source/util/DynamoDbStreamsProxyProvider.java | 23 ++++++ .../connector/dynamodb/source/util/TestUtil.java | 35 ++++++++ pom.xml | 2 +- 24 files changed, 627 insertions(+), 81 deletions(-) diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java index 67331e3..16e6aa6 100644 --- a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java +++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSClientUtilTest.java @@ -194,8 +194,10 @@ class AWSClientUtilTest { ClientOverrideConfiguration resultOverrideConfiguration = s3Client.serviceClientConfiguration().overrideConfiguration(); - assertThat(resultOverrideConfiguration.retryStrategy()) - .isEqualTo(Optional.of(overrideRetryStrategy)); + assertThat(resultOverrideConfiguration.retryStrategy()).isPresent(); + RetryStrategy resultStrategy = resultOverrideConfiguration.retryStrategy().get(); + assertThat(resultStrategy.maxAttempts()).isEqualTo(10); + assertThat(resultOverrideConfiguration.retryPolicy()).isEqualTo(Optional.empty()); assertThat(resultOverrideConfiguration.retryMode()).isEqualTo(Optional.empty()); assertThat(resultOverrideConfiguration.retryStrategyConfigurator()) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java index 4978f46..c41d30f 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java @@ -50,6 +50,7 @@ import java.beans.BeanInfo; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -176,7 +177,9 @@ public class DynamoDbTypeInformedElementConverter<T> tableSchemaBuilder, propertyDescriptor.getName(), BeanAttributeGetter.create( - typeInfo.getTypeClass(), propertyDescriptor.getReadMethod()), + typeInfo.getTypeClass(), + propertyDescriptor.getReadMethod(), + MethodHandles.lookup()), fieldInfo); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java index 5b20bc1..051d127 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java @@ -55,10 +55,13 @@ import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.retries.AdaptiveRetryStrategy; import software.amazon.awssdk.retries.api.BackoffStrategy; +import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -145,14 +148,18 @@ public class DynamoDbStreamsSource<T> final Duration getRecordsIdlePollingTimeBetweenEmptyPolls = sourceConfig.get(DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS); + Map<String, List<Shard>> childShardMap = new ConcurrentHashMap<>(); // We create a new stream proxy for each split reader since they have their own independent // lifecycle. Supplier<PollingDynamoDbStreamsShardSplitReader> splitReaderSupplier = () -> new PollingDynamoDbStreamsShardSplitReader( - createDynamoDbStreamsProxy(sourceConfig), + createDynamoDbStreamsProxy( + sourceConfig, + SdkDefaultRetryStrategy.defaultRetryStrategy()), getRecordsIdlePollingTimeBetweenNonEmptyPolls, getRecordsIdlePollingTimeBetweenEmptyPolls, + childShardMap, shardMetricGroupMap); DynamoDbStreamsRecordEmitter<T> recordEmitter = new DynamoDbStreamsRecordEmitter<>(deserializationSchema); @@ -162,6 +169,7 @@ public class DynamoDbStreamsSource<T> recordEmitter, sourceConfig, readerContext, + childShardMap, shardMetricGroupMap); } @@ -178,11 +186,25 @@ public class DynamoDbStreamsSource<T> SplitEnumeratorContext<DynamoDbStreamsShardSplit> enumContext, DynamoDbStreamsSourceEnumeratorState checkpoint) throws Exception { + int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); + Duration minDelayBetweenRetries = + sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY); + Duration maxDelayBetweenRetries = + sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY); + BackoffStrategy backoffStrategy = + BackoffStrategy.exponentialDelay(minDelayBetweenRetries, maxDelayBetweenRetries); + AdaptiveRetryStrategy adaptiveRetryStrategy = + SdkDefaultRetryStrategy.adaptiveRetryStrategy() + .toBuilder() + .maxAttempts(maxApiCallAttempts) + .backoffStrategy(backoffStrategy) + .throttlingBackoffStrategy(backoffStrategy) + .build(); return new DynamoDbStreamsSourceEnumerator( enumContext, streamArn, sourceConfig, - createDynamoDbStreamsProxy(sourceConfig), + createDynamoDbStreamsProxy(sourceConfig, adaptiveRetryStrategy), dynamoDbStreamsShardAssigner, checkpoint); } @@ -199,7 +221,8 @@ public class DynamoDbStreamsSource<T> new DynamoDbStreamsShardSplitSerializer()); } - private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) { + private DynamoDbStreamsProxy createDynamoDbStreamsProxy( + Configuration consumerConfig, RetryStrategy retryStrategy) { SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient( AttributeMap.builder().build(), ApacheHttpClient.builder()); @@ -215,26 +238,12 @@ public class DynamoDbStreamsSource<T> consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); - int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT); - Duration minDescribeStreamDelay = - sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY); - Duration maxDescribeStreamDelay = - sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY); - BackoffStrategy backoffStrategy = - BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); - AdaptiveRetryStrategy adaptiveRetryStrategy = - SdkDefaultRetryStrategy.adaptiveRetryStrategy() - .toBuilder() - .maxAttempts(maxApiCallAttempts) - .backoffStrategy(backoffStrategy) - .throttlingBackoffStrategy(backoffStrategy) - .build(); DynamoDbStreamsClient dynamoDbStreamsClient = AWSClientUtil.createAwsSyncClient( dynamoDbStreamsClientProperties, httpClient, DynamoDbStreamsClient.builder(), - ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy), + ClientOverrideConfiguration.builder().retryStrategy(retryStrategy), DynamodbStreamsSourceConfigConstants .BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT, DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java index b16833e..8e21d09 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java @@ -91,6 +91,10 @@ public class DynamodbStreamsSourceConfigConstants { .withDescription( "The default idle time between non-empty polls for DynamoDB Streams GetRecords API"); + public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5; + public static final long CHILD_SHARD_DISCOVERY_MIN_DELAY_MS = 100; + public static final long CHILD_SHARD_DISCOVERY_MAX_DELAY_MS = 1000; + /** DynamoDb Streams identifier for user agent prefix. */ public static final String DDB_STREAMS_CLIENT_USER_AGENT_PREFIX = "aws.dynamodbstreams.client.user-agent-prefix"; diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java index 0c2f00b..c178611 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java @@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitGraphInconsistencyTracker; import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitTracker; import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException; @@ -138,7 +139,22 @@ public class DynamoDbStreamsSourceEnumerator /** When we mark a split as finished, we will only assign its child splits to the subtasks. */ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { - splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + Set<String> finishedSplitIds = + splitsFinishedEvent.getFinishedSplits().stream() + .map(SplitsFinishedEventContext::getSplitId) + .collect(Collectors.toSet()); + splitTracker.markAsFinished(finishedSplitIds); + List<Shard> childrenOfFinishedSplits = new ArrayList<>(); + splitsFinishedEvent + .getFinishedSplits() + .forEach( + finishedSplitEvent -> + childrenOfFinishedSplits.addAll( + finishedSplitEvent.getChildSplits())); + LOG.debug( + "Adding Children of finishedSplits to splitTracker: {}", childrenOfFinishedSplits); + LOG.info("Added {} child splits to splitTracker", childrenOfFinishedSplits.size()); + splitTracker.addChildSplits(childrenOfFinishedSplits); Set<DynamoDbStreamsShardSplit> splitsAssignment = splitAssignment.get(subtaskId); // during recovery, splitAssignment may return null since there might be no split assigned @@ -152,13 +168,12 @@ public class DynamoDbStreamsSourceEnumerator + "Child shard discovery might be delayed until we have enough readers." + "Finished split ids: {}", subtaskId, - splitsFinishedEvent.getFinishedSplitIds()); + finishedSplitIds); return; } - splitsAssignment.removeIf( - split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId())); - assignChildSplits(splitsFinishedEvent.getFinishedSplitIds()); + splitsAssignment.removeIf(split -> finishedSplitIds.contains(split.splitId())); + assignChildSplits(finishedSplitIds); } private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { @@ -230,6 +245,7 @@ public class DynamoDbStreamsSourceEnumerator private void assignChildSplits(Set<String> finishedSplitIds) { List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment = splitTracker.getUnassignedChildSplits(finishedSplitIds); + LOG.info("Unassigned child splits: {}", splitsAvailableForAssignment); assignSplits(splitsAvailableForAssignment); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java index 0da5f01..22659c4 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java @@ -22,26 +22,27 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceEvent; import java.util.Set; +import java.util.stream.Collectors; /** Source event used by source reader to communicate that splits are finished to enumerator. */ @Internal public class SplitsFinishedEvent implements SourceEvent { private static final long serialVersionUID = 1; - private final Set<String> finishedSplitIds; + private final Set<SplitsFinishedEventContext> finishedSplits; - public SplitsFinishedEvent(Set<String> finishedSplitIds) { - this.finishedSplitIds = finishedSplitIds; + public SplitsFinishedEvent(Set<SplitsFinishedEventContext> finishedSplits) { + this.finishedSplits = finishedSplits; } - public Set<String> getFinishedSplitIds() { - return finishedSplitIds; + public Set<SplitsFinishedEventContext> getFinishedSplits() { + return finishedSplits; } @Override public String toString() { return "SplitsFinishedEvent{" - + "finishedSplitIds=[" - + String.join(",", finishedSplitIds) + + "finishedSplit=[" + + finishedSplits.stream().map(Object::toString).collect(Collectors.joining(",")) + "]}"; } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java new file mode 100644 index 0000000..67bf76c --- /dev/null +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java @@ -0,0 +1,52 @@ +package org.apache.flink.connector.dynamodb.source.enumerator.event; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; + +/** Context which contains the split id and the finished splits for a finished split event. */ +@Internal +public class SplitsFinishedEventContext implements Serializable { + private static final long serialVersionUID = 2L; + private final String splitId; + private final List<Shard> childSplits; + + public SplitsFinishedEventContext(String splitId, List<Shard> childSplits) { + this.splitId = splitId; + this.childSplits = childSplits; + } + + public String getSplitId() { + return splitId; + } + + public List<Shard> getChildSplits() { + return childSplits; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SplitsFinishedEventContext that = (SplitsFinishedEventContext) o; + + if (!splitId.equals(that.splitId)) { + return false; + } + return childSplits.equals(that.childSplits); + } + + @Override + public int hashCode() { + return Objects.hash(splitId, childSplits); + } +} diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java index e655d4c..042a216 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java @@ -102,6 +102,10 @@ public class SplitTracker { addSplitsForLatest(shardsToAdd); } + public void addChildSplits(Collection<Shard> childShardsToAdd) { + addSplitsForTrimHorizon(childShardsToAdd); + } + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { List<Shard> openShards = shardsToAdd.stream() diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java index 537b1bf..133da8d 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @@ -88,6 +89,24 @@ public class DynamoDbStreamsProxy implements StreamProxy { return listShardsResult; } + @Override + public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) { + LOG.debug("Child shards with filter called, for shardId: {}", shardFilter.shardId()); + ListShardsResult listShardsResult = new ListShardsResult(); + + try { + DescribeStreamResponse describeStreamResponse = + this.describeStream(streamArn, shardFilter); + listShardsResult.addShards(describeStreamResponse.streamDescription().shards()); + listShardsResult.setStreamStatus( + describeStreamResponse.streamDescription().streamStatus()); + } catch (Exception e) { + LOG.warn("DescribeStream with Filter API threw an exception", e); + } + LOG.info("Child shards returned for shardId: {}", listShardsResult); + return listShardsResult; + } + @Override public GetRecordsResponse getRecords( String streamArn, String shardId, StartingPosition startingPosition) { @@ -170,6 +189,30 @@ public class DynamoDbStreamsProxy implements StreamProxy { return describeStreamResponse; } + private DescribeStreamResponse describeStream(String streamArn, ShardFilter shardFilter) { + final DescribeStreamRequest describeStreamRequest = + DescribeStreamRequest.builder() + .streamArn(streamArn) + .shardFilter(shardFilter) + .build(); + + DescribeStreamResponse describeStreamResponse = + dynamoDbStreamsClient.describeStream(describeStreamRequest); + + StreamStatus streamStatus = describeStreamResponse.streamDescription().streamStatus(); + if (streamStatus.equals(StreamStatus.ENABLING)) { + if (LOG.isWarnEnabled()) { + LOG.warn( + String.format( + "The status of stream %s is %s ; result of the current " + + "describeStream operation will not contain any shard information.", + streamArn, streamStatus)); + } + } + + return describeStreamResponse; + } + private String getShardIterator( String streamArn, String shardId, StartingPosition startingPosition) { GetShardIteratorRequest.Builder requestBuilder = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java index fa9bf7c..10c941e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.dynamodb.source.split.StartingPosition; import org.apache.flink.connector.dynamodb.source.util.ListShardsResult; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; import javax.annotation.Nullable; @@ -41,6 +42,15 @@ public interface StreamProxy extends Closeable { */ ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId); + /** + * Obtains the child shards of a given shard, filtered by the {@link ShardFilter}. + * + * @param streamArn the ARN of the stream + * @param shardFilter the filter to apply + * @return shard list + */ + ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter); + /** * Retrieves records from the stream. * diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java index 90a105d..9988dfb 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState; @@ -32,6 +33,7 @@ import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSpli import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.Shard; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +43,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; /** * Coordinates the reading from assigned splits. Runs on the TaskManager. @@ -55,6 +58,7 @@ public class DynamoDbStreamsSourceReader<T> private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class); private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap; private final NavigableMap<Long, Set<DynamoDbStreamsShardSplit>> splitFinishedEvents; + private final Map<String, List<Shard>> childShardIdMap; private long currentCheckpointId; public DynamoDbStreamsSourceReader( @@ -62,10 +66,12 @@ public class DynamoDbStreamsSourceReader<T> RecordEmitter<Record, T, DynamoDbStreamsShardSplitState> recordEmitter, Configuration config, SourceReaderContext context, + Map<String, List<Shard>> childShardIdMap, Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) { super(splitFetcherManager, recordEmitter, config, context); this.shardMetricGroupMap = shardMetricGroupMap; this.splitFinishedEvents = new TreeMap<>(); + this.childShardIdMap = childShardIdMap; this.currentCheckpointId = Long.MIN_VALUE; } @@ -83,19 +89,39 @@ public class DynamoDbStreamsSourceReader<T> splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>()); finishedSplitIds.values().stream() .map( - finishedSplit -> - new DynamoDbStreamsShardSplit( - finishedSplit.getStreamArn(), - finishedSplit.getShardId(), - finishedSplit.getNextStartingPosition(), - finishedSplit - .getDynamoDbStreamsShardSplit() - .getParentShardId(), - true)) + finishedSplit -> { + List<Shard> childSplits = new ArrayList<>(); + String finishedSplitId = finishedSplit.getSplitId(); + if (childShardIdMap.containsKey(finishedSplitId)) { + List<Shard> childSplitIdsOfFinishedSplit = + childShardIdMap.get(finishedSplitId); + childSplits.addAll(childSplitIdsOfFinishedSplit); + } + return new DynamoDbStreamsShardSplit( + finishedSplit.getStreamArn(), + finishedSplit.getShardId(), + finishedSplit.getNextStartingPosition(), + finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(), + childSplits, + true); + }) .forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split)); + Set<SplitsFinishedEventContext> splitsFinishedEventContextMap = + finishedSplitIds.values().stream() + .map(DynamoDbStreamsShardSplitState::getSplitId) + .map( + finishedSplitId -> + new SplitsFinishedEventContext( + finishedSplitId, + childShardIdMap.getOrDefault( + finishedSplitId, Collections.emptyList()))) + .peek(context -> childShardIdMap.remove(context.getSplitId())) + .collect(Collectors.toSet()); + + LOG.info("Sending splits finished event to coordinator: {}", splitsFinishedEventContextMap); context.sendSourceEventToCoordinator( - new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet()))); + new SplitsFinishedEvent(splitsFinishedEventContextMap)); finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } @@ -121,8 +147,10 @@ public class DynamoDbStreamsSourceReader<T> // buffer. If the next checkpoint doesn't complete, // we would go back to the previous checkpointed // state which will again replay these split finished events. + SplitsFinishedEventContext splitsFinishedEventContext = + new SplitsFinishedEventContext(split.splitId(), split.getChildSplits()); context.sendSourceEventToCoordinator( - new SplitsFinishedEvent(Collections.singleton(split.splitId()))); + new SplitsFinishedEvent(Collections.singleton(splitsFinishedEventContext))); } else { dynamoDbStreamsShardSplits.add(split); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java index 8a516be..6b89ee9 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java @@ -27,11 +27,16 @@ import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState; import org.apache.flink.connector.dynamodb.source.split.StartingPosition; +import org.apache.flink.connector.dynamodb.source.util.ListShardsResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; import javax.annotation.Nullable; @@ -43,10 +48,14 @@ import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import static java.util.Collections.singleton; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MAX_DELAY_MS; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MIN_DELAY_MS; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS; /** * An implementation of the SplitReader that periodically polls the DynamoDb stream to retrieve @@ -64,6 +73,7 @@ public class PollingDynamoDbStreamsShardSplitReader private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls; private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits; + private final Map<String, List<Shard>> childShardMap; private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap; private final Set<String> pausedSplitIds; private static final Logger LOG = @@ -73,6 +83,7 @@ public class PollingDynamoDbStreamsShardSplitReader StreamProxy dynamodbStreamsProxy, Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls, Duration getRecordsIdlePollingTimeBetweenEmptyPolls, + Map<String, List<Shard>> childShardMap, Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) { this.dynamodbStreams = dynamodbStreamsProxy; this.getRecordsIdlePollingTimeBetweenNonEmptyPolls = @@ -80,6 +91,7 @@ public class PollingDynamoDbStreamsShardSplitReader this.getRecordsIdlePollingTimeBetweenEmptyPolls = getRecordsIdlePollingTimeBetweenEmptyPolls; this.shardMetricGroupMap = shardMetricGroupMap; + this.childShardMap = childShardMap; this.assignedSplits = new ArrayDeque<>(); this.pausedSplitIds = new HashSet<>(); } @@ -106,6 +118,11 @@ public class PollingDynamoDbStreamsShardSplitReader } long currentTime = System.currentTimeMillis(); + + if (splitContext.splitState.isShardEndReached()) { + return handleShardEnd(splitContext, currentTime); + } + long nextEligibleTime = getNextEligibleTime(splitContext); LOG.debug( @@ -132,15 +149,11 @@ public class PollingDynamoDbStreamsShardSplitReader splitContext.lastPollTimeMillis = currentTime; splitContext.wasLastPollEmpty = isEmptyPoll; + splitContext.splitState.setShardEndReached(isComplete); + assignedSplits.add(splitContext); if (isEmptyPoll) { - if (isComplete) { - return new DynamoDbStreamRecordsWithSplitIds( - Collections.emptyIterator(), splitContext.splitState.getSplitId(), true); - } else { - assignedSplits.add(splitContext); - return INCOMPLETE_SHARD_EMPTY_RECORDS; - } + return INCOMPLETE_SHARD_EMPTY_RECORDS; } else { DynamoDbStreamsShardMetrics shardMetrics = shardMetricGroupMap.get(splitContext.splitState.getShardId()); @@ -164,13 +177,66 @@ public class PollingDynamoDbStreamsShardSplitReader .dynamodb() .sequenceNumber())); - if (!isComplete) { - assignedSplits.add(splitContext); - } return new DynamoDbStreamRecordsWithSplitIds( getRecordsResponse.records().iterator(), splitContext.splitState.getSplitId(), - isComplete); + false); + } + + private RecordsWithSplitIds<Record> handleShardEnd( + DynamoDbStreamsShardSplitWithContext splitContext, long currentTime) { + if (!splitContext.hasAttemptedChildShardDiscovery) { + splitContext.hasAttemptedChildShardDiscovery = true; + splitContext.childShardDiscoveryAttempts = 0; + } + + if (splitContext.childShardDiscoveryAttempts < MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS) { + long nextChildShardDiscoveryEligibleTime = + getNextEligibleTimeForChildDiscovery(splitContext); + if (currentTime >= nextChildShardDiscoveryEligibleTime) { + ListShardsResult listShardsResult = + dynamodbStreams.listShardsWithFilter( + splitContext.splitState.getStreamArn(), + ShardFilter.builder() + .shardId(splitContext.splitState.getShardId()) + .type(ShardFilterType.CHILD_SHARDS) + .build()); + + if (!StreamStatus.ENABLED.equals(listShardsResult.getStreamStatus())) { + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), + splitContext.splitState.getSplitId(), + true); + } + + List<Shard> childShards = listShardsResult.getShards(); + if (!childShards.isEmpty()) { + this.childShardMap.put(splitContext.splitState.getSplitId(), childShards); + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), + splitContext.splitState.getSplitId(), + true); + } + splitContext.childShardDiscoveryAttempts++; + splitContext.lastChildShardDiscoveryAttemptTime = currentTime; + } + assignedSplits.add(splitContext); + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } else { + return new DynamoDbStreamRecordsWithSplitIds( + Collections.emptyIterator(), splitContext.splitState.getSplitId(), true); + } + } + + private long getNextEligibleTimeForChildDiscovery( + DynamoDbStreamsShardSplitWithContext splitContext) { + + long exponentialDelay = + Math.min( + CHILD_SHARD_DISCOVERY_MIN_DELAY_MS + * (1L << splitContext.childShardDiscoveryAttempts), + CHILD_SHARD_DISCOVERY_MAX_DELAY_MS); + return splitContext.lastChildShardDiscoveryAttemptTime + exponentialDelay; } private void sleep(long milliseconds) throws IOException { @@ -254,11 +320,15 @@ public class PollingDynamoDbStreamsShardSplitReader final DynamoDbStreamsShardSplitState splitState; long lastPollTimeMillis; boolean wasLastPollEmpty; + boolean hasAttemptedChildShardDiscovery; + int childShardDiscoveryAttempts; + long lastChildShardDiscoveryAttemptTime; DynamoDbStreamsShardSplitWithContext(DynamoDbStreamsShardSplitState splitState) { this.splitState = splitState; this.lastPollTimeMillis = System.currentTimeMillis(); this.wasLastPollEmpty = false; + hasAttemptedChildShardDiscovery = false; } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java index b79f1e8..6f1943d 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java @@ -23,7 +23,11 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDbStreamsSourceEnumerator; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,6 +45,7 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { private final StartingPosition startingPosition; private final String parentShardId; private final boolean isFinished; + private final List<Shard> childSplits; public DynamoDbStreamsShardSplit( String streamArn, @@ -55,6 +60,25 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { String shardId, StartingPosition startingPosition, String parentShardId, + List<Shard> childSplits) { + this(streamArn, shardId, startingPosition, parentShardId, childSplits, false); + } + + public DynamoDbStreamsShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + String parentShardId, + boolean isFinished) { + this(streamArn, shardId, startingPosition, parentShardId, List.of(), isFinished); + } + + public DynamoDbStreamsShardSplit( + String streamArn, + String shardId, + StartingPosition startingPosition, + String parentShardId, + List<Shard> childSplits, boolean isFinished) { checkNotNull(streamArn, "streamArn cannot be null"); checkNotNull(shardId, "shardId cannot be null"); @@ -65,6 +89,7 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { this.startingPosition = startingPosition; this.parentShardId = parentShardId; this.isFinished = isFinished; + this.childSplits = childSplits; } @Override @@ -92,6 +117,10 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { return isFinished; } + public List<Shard> getChildSplits() { + return childSplits; + } + @Override public String toString() { return "DynamoDbStreamsShardSplit{" @@ -108,6 +137,11 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { + "]" + ", isFinished=" + isFinished + + ", childSplitIds=[" + + childSplits.stream().map(Shard::toString).collect(Collectors.joining(",")) + + "], " + + ", isFinished=" + + isFinished + "}"; } @@ -124,11 +158,13 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit { && Objects.equals(shardId, that.shardId) && Objects.equals(startingPosition, that.startingPosition) && Objects.equals(parentShardId, that.parentShardId) - && Objects.equals(isFinished, that.isFinished); + && Objects.equals(isFinished, that.isFinished) + && Objects.equals(childSplits, that.childSplits); } @Override public int hashCode() { - return Objects.hash(streamArn, shardId, startingPosition, parentShardId, isFinished); + return Objects.hash( + streamArn, shardId, startingPosition, parentShardId, isFinished, childSplits); } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java index b10bfce..10ddc79 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java @@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.VersionMismatchException; +import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; +import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import java.io.ByteArrayInputStream; @@ -29,8 +31,10 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -41,8 +45,8 @@ import java.util.Set; public class DynamoDbStreamsShardSplitSerializer implements SimpleVersionedSerializer<DynamoDbStreamsShardSplit> { - private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); - private static final int CURRENT_VERSION = 1; + private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2)); + private static final int CURRENT_VERSION = 2; @Override public int getVersion() { @@ -74,6 +78,18 @@ public class DynamoDbStreamsShardSplitSerializer out.writeUTF(split.getParentShardId()); } out.writeBoolean(split.isFinished()); + out.writeInt(split.getChildSplits().size()); + for (Shard childSplit : split.getChildSplits()) { + out.writeUTF(childSplit.shardId()); + out.writeUTF(childSplit.parentShardId()); + out.writeUTF(childSplit.sequenceNumberRange().startingSequenceNumber()); + if (childSplit.sequenceNumberRange().endingSequenceNumber() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(childSplit.sequenceNumberRange().endingSequenceNumber()); + } + } out.flush(); return baos.toByteArray(); @@ -116,11 +132,40 @@ public class DynamoDbStreamsShardSplitSerializer isFinished = in.readBoolean(); } + int childSplitSize = 0; + List<Shard> childSplits = new ArrayList<>(); + if (version > 1) { + childSplitSize = in.readInt(); + if (childSplitSize > 0) { + for (int i = 0; i < childSplitSize; i++) { + String splitId = in.readUTF(); + String parentSplitId = in.readUTF(); + String startingSequenceNumber = in.readUTF(); + String endingSequenceNumber = null; + if (in.readBoolean()) { + endingSequenceNumber = in.readUTF(); + } + childSplits.add( + Shard.builder() + .shardId(splitId) + .parentShardId(parentSplitId) + .sequenceNumberRange( + SequenceNumberRange.builder() + .startingSequenceNumber( + startingSequenceNumber) + .endingSequenceNumber(endingSequenceNumber) + .build()) + .build()); + } + } + } + return new DynamoDbStreamsShardSplit( streamArn, shardId, new StartingPosition(shardIteratorType, startingMarker), parentShardId, + childSplits, isFinished); } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java index 47e20a1..1ddc137 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java @@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState { private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit; private StartingPosition nextStartingPosition; private String nextShardIterator; + private boolean shardEndReached; public DynamoDbStreamsShardSplitState(DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit) { this.dynamoDbStreamsShardSplit = dynamoDbStreamsShardSplit; this.nextStartingPosition = dynamoDbStreamsShardSplit.getStartingPosition(); + this.shardEndReached = false; } public DynamoDbStreamsShardSplit getDynamoDbStreamsShardSplit() { @@ -70,4 +72,12 @@ public class DynamoDbStreamsShardSplitState { public void setNextShardIterator(String nextShardIterator) { this.nextShardIterator = nextShardIterator; } + + public boolean isShardEndReached() { + return shardEndReached; + } + + public void setShardEndReached(boolean shardEndReached) { + this.shardEndReached = shardEndReached; + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java index 031d6de..4c543a3 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; import org.apache.flink.connector.dynamodb.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; import org.apache.flink.connector.dynamodb.source.split.StartingPosition; @@ -43,8 +44,8 @@ import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import java.time.Duration; import java.time.Instant; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Stream; import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; @@ -304,21 +305,28 @@ class DynamoDbStreamsSourceEnumeratorTest { }; streamProxy.addShards(childShards); enumerator.handleSourceEvent( - subtaskId, new SplitsFinishedEvent(Collections.singleton(shards[2].shardId()))); - // Given no resharding occurs (list of shards remains the same) - // When first periodic discovery runs - context.runPeriodicCallable(0); - // Then no additional splits are assigned - SplitsAssignment<DynamoDbStreamsShardSplit> periodicDiscoverySplitAssignment = - context.getSplitsAssignmentSequence().get(2); + subtaskId, + new SplitsFinishedEvent( + Set.of( + new SplitsFinishedEventContext( + shards[2].shardId(), List.of(childShards[0]))))); + DynamoDbStreamsShardSplit childSplit = new DynamoDbStreamsShardSplit( STREAM_ARN, childShards[0].shardId(), StartingPosition.fromStart(), shards[2].shardId()); - assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId)) + assertThat(context.getSplitsAssignmentSequence().get(1).assignment().get(subtaskId)) .containsExactly(childSplit); + // Given no resharding occurs (list of shards remains the same) + // When first periodic discovery runs + context.runPeriodicCallable(0); + // Then no additional splits are assigned + SplitsAssignment<DynamoDbStreamsShardSplit> periodicDiscoverySplitAssignment = + context.getSplitsAssignmentSequence().get(2); + assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId)) + .isNullOrEmpty(); } } @@ -342,7 +350,7 @@ class DynamoDbStreamsSourceEnumeratorTest { Instant startTimestamp = Instant.now(); DynamoDbStreamsSourceEnumeratorState state = new DynamoDbStreamsSourceEnumeratorState( - Collections.singletonList( + List.of( new DynamoDBStreamsShardSplitWithAssignmentStatus( new DynamoDbStreamsShardSplit( STREAM_ARN, @@ -425,7 +433,7 @@ class DynamoDbStreamsSourceEnumeratorTest { getTestStreamProxy(); DynamoDbStreamsSourceEnumerator enumerator = getSimpleEnumeratorWithNoState(context, streamProxy); - List<DynamoDbStreamsShardSplit> splits = Collections.singletonList(getTestSplit()); + List<DynamoDbStreamsShardSplit> splits = List.of(getTestSplit()); // Given enumerator has no assigned splits // When we add splits back @@ -765,7 +773,11 @@ class DynamoDbStreamsSourceEnumeratorTest { context.runNextOneTimeCallable(); enumerator.handleSourceEvent( - 1, new SplitsFinishedEvent(Collections.singleton(completedShard.shardId()))); + 1, + new SplitsFinishedEvent( + Set.of( + new SplitsFinishedEventContext( + completedShard.shardId(), List.of(shards[1]))))); // When restored from state DynamoDbStreamsSourceEnumeratorState snapshotState = enumerator.snapshotState(1); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java index 8c5b204..9c662a7 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java @@ -38,6 +38,8 @@ import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.model.StreamDescription; import software.amazon.awssdk.services.dynamodb.model.StreamStatus; @@ -50,8 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; /** Tests to validate {@link DynamoDbStreamsProxy}. */ public class DynamoDbStreamsProxyTest { @@ -88,6 +90,86 @@ public class DynamoDbStreamsProxyTest { .isEqualTo(expectedListShardsResult); } + @Test + void testListShardsWithFilterForChildShards() { + final String streamArn = + "arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826"; + final String parentShardId = "shardId-000000000001"; + + // Create child shards that we expect to be returned + final List<Shard> childShards = + List.of( + Shard.builder() + .shardId("shardId-000000000002") + .parentShardId(parentShardId) + .build(), + Shard.builder() + .shardId("shardId-000000000003") + .parentShardId(parentShardId) + .build()); + + // Create some other shards that should not be returned + final List<Shard> otherShards = + List.of( + Shard.builder() + .shardId("shardId-000000000004") + .parentShardId("different-parent") + .build(), + Shard.builder().shardId("shardId-000000000005").build()); + + // Set up the expected response + final ListShardsResult expectedResult = new ListShardsResult(); + expectedResult.addShards(childShards); + expectedResult.setStreamStatus(StreamStatus.ENABLED); + + // Create describe stream response with all shards + List<Shard> allShards = new ArrayList<>(); + allShards.addAll(childShards); + allShards.addAll(otherShards); + + DescribeStreamResponse describeStreamResponse = + DescribeStreamResponse.builder() + .streamDescription( + StreamDescription.builder() + .shards(allShards) + .streamStatus(StreamStatus.ENABLED) + .lastEvaluatedShardId(null) + .build()) + .build(); + + TestingDynamoDbStreamsClient testingDynamoDbStreamsClient = + new TestingDynamoDbStreamsClient(); + + // Verify the correct request is made + testingDynamoDbStreamsClient.setDescribeStreamValidation( + request -> { + assertThat(request.streamArn()).isEqualTo(streamArn); + assertThat(request.shardFilter()).isNotNull(); + assertThat(request.shardFilter().type()) + .isEqualTo(ShardFilterType.CHILD_SHARDS); + assertThat(request.shardFilter().shardId()).isEqualTo(parentShardId); + }); + + testingDynamoDbStreamsClient.setDescribeStreamResponse(describeStreamResponse); + + DynamoDbStreamsProxy dynamoDbStreamsProxy = + new DynamoDbStreamsProxy(testingDynamoDbStreamsClient, HTTP_CLIENT); + + // Create the filter for child shards + ShardFilter childShardFilter = + ShardFilter.builder() + .type(ShardFilterType.CHILD_SHARDS) + .shardId(parentShardId) + .build(); + + // Execute the method and verify results + ListShardsResult result = + dynamoDbStreamsProxy.listShardsWithFilter(streamArn, childShardFilter); + + assertThat(result.getShards()).hasSize(2).containsExactlyInAnyOrderElementsOf(childShards); + assertThat(result.getStreamStatus()).isEqualTo(StreamStatus.ENABLED); + } + @Test void testGetRecordsInitialReadFromTrimHorizon() { final String streamArn = diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java index d66c0bb..12e8ac4 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent; +import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext; import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics; import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -73,6 +75,7 @@ class DynamoDbStreamsSourceReaderTest { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, EMPTY_POLLING_DELAY_MILLIS, + new ConcurrentHashMap<>(), shardMetricGroupMap); testingReaderContext = @@ -84,6 +87,7 @@ class DynamoDbStreamsSourceReaderTest { new DynamoDbStreamsRecordEmitter<>(null), new Configuration(), testingReaderContext, + new ConcurrentHashMap<>(), shardMetricGroupMap); } @@ -122,12 +126,14 @@ class DynamoDbStreamsSourceReaderTest { List<SourceEvent> events = testingReaderContext.getSentEvents(); - Set<String> expectedSplitIds = Collections.singleton(split.splitId()); + Set<SplitsFinishedEventContext> expectedFinishedSplits = + Collections.singleton( + new SplitsFinishedEventContext(split.splitId(), new ArrayList<>())); assertThat(events) .singleElement() .isInstanceOf(SplitsFinishedEvent.class) .usingRecursiveComparison() - .isEqualTo(new SplitsFinishedEvent(expectedSplitIds)); + .isEqualTo(new SplitsFinishedEvent(expectedFinishedSplits)); } @Test @@ -225,8 +231,10 @@ class DynamoDbStreamsSourceReaderTest { .allSatisfy( e -> { SplitsFinishedEvent event = (SplitsFinishedEvent) e; - assertThat(event.getFinishedSplitIds()).hasSize(1); - assertThat(event.getFinishedSplitIds()) + assertThat(event.getFinishedSplits()).hasSize(1); + assertThat( + event.getFinishedSplits().stream() + .map(SplitsFinishedEventContext::getSplitId)) .containsAnyOf("finished-split-1", "finished-split-2"); }); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java index f3cc169..bcd57cf 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java @@ -73,6 +73,7 @@ class PollingDynamoDbStreamsShardSplitReaderTest { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, EMPTY_POLLING_DELAY_MILLIS, + new ConcurrentHashMap<>(), shardMetricGroupMap); } @@ -235,12 +236,20 @@ class PollingDynamoDbStreamsShardSplitReaderTest { assertThat(retrievedRecords.finishedSplits()).isEmpty(); fetchedRecords.add(retrievedRecords.nextRecordFromSplit()); } - - assertThat(retrievedRecords.nextSplit()).isNull(); - assertThat(retrievedRecords.finishedSplits()).contains(split.splitId()); assertThat(fetchedRecords) .containsExactlyInAnyOrderElementsOf(expectedRecords); }); + + // Now wait for the split to be marked as finished after child shard discovery attempts + await().pollDelay(NON_EMPTY_POLLING_DELAY_MILLIS) + .atMost(Duration.ofSeconds(30)) // Allow enough time for all retry attempts + .untilAsserted( + () -> { + RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch(); + // No more records should be returned + assertThat(readAllRecords(retrievedRecords)).isEmpty(); + assertThat(retrievedRecords.finishedSplits()).contains(split.splitId()); + }); } @Test @@ -400,6 +409,7 @@ class PollingDynamoDbStreamsShardSplitReaderTest { testStreamProxy, NON_EMPTY_POLLING_DELAY_MILLIS, testEmptyPollDelay, + new ConcurrentHashMap<>(), shardMetricGroupMap); // Immediate second poll - should return empty due to polling delay diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java index ebf1178..410f93b 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java @@ -29,6 +29,7 @@ import java.util.stream.Stream; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.SHARD_ID; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.STREAM_ARN; import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplit; +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplitWithChildShards; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; @@ -47,6 +48,19 @@ class DynamoDbStreamsShardSplitSerializerTest { assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); } + @Test + void testSerializeAndDeserializeWithChildSplits() throws Exception { + final DynamoDbStreamsShardSplit initialSplit = getTestSplitWithChildShards(); + + DynamoDbStreamsShardSplitSerializer serializer = new DynamoDbStreamsShardSplitSerializer(); + + byte[] serialized = serializer.serialize(initialSplit); + DynamoDbStreamsShardSplit deserializedSplit = + serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit); + } + @ParameterizedTest @MethodSource("provideStartingPositions") void testSerializeAndDeserializeWithStartingPosition(StartingPosition startingPosition) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java index c4c3285..8e6c0a5 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java @@ -26,12 +26,18 @@ import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; +import software.amazon.awssdk.services.dynamodb.model.StreamDescription; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsServiceClientConfiguration; import java.util.ArrayDeque; import java.util.Deque; +import java.util.List; import java.util.function.Consumer; +import java.util.stream.Collectors; /** Provides {@link DynamoDbStreamsClient} with mocked DynamoDbStreams behavior. */ public class DynamoDbStreamsClientProvider { @@ -84,6 +90,29 @@ public class DynamoDbStreamsClientProvider { throws AwsServiceException, SdkClientException { describeStreamValidation.accept(describeStreamRequest); + ShardFilter shardFilter = describeStreamRequest.shardFilter(); + if (shardFilter != null && ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) { + List<Shard> shards = describeStreamResponse.streamDescription().shards(); + List<Shard> childShards = + shards.stream() + .filter( + shard -> + shard.parentShardId() != null + && shard.parentShardId() + .equals(shardFilter.shardId())) + .collect(Collectors.toList()); + return DescribeStreamResponse.builder() + .streamDescription( + StreamDescription.builder() + .shards(childShards) + .streamArn(describeStreamRequest.streamArn()) + .streamStatus( + describeStreamResponse + .streamDescription() + .streamStatus()) + .build()) + .build(); + } return describeStreamResponse; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java index c9f45c1..7970f73 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java @@ -26,6 +26,8 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.ShardFilter; +import software.amazon.awssdk.services.dynamodb.model.ShardFilterType; import javax.annotation.Nullable; @@ -84,6 +86,27 @@ public class DynamoDbStreamsProxyProvider { return listShardsResult; } + @Override + public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) { + if (!ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) { + throw new UnsupportedOperationException( + String.format( + "ShardFilterType %s not supported", shardFilter.type().name())); + } + + ListShardsResult listShardsResult = new ListShardsResult(); + List<Shard> childShards = + shards.stream() + .filter( + shard -> + shard.parentShardId() != null + && shard.parentShardId() + .equals(shardFilter.shardId())) + .collect(Collectors.toList()); + listShardsResult.addShards(childShards); + return listShardsResult; + } + @Override public GetRecordsResponse getRecords( String streamArn, String shardId, StartingPosition startingPosition) { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java index 9c3fcb9..4c15959 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java @@ -35,7 +35,10 @@ import software.amazon.awssdk.services.dynamodb.model.StreamRecord; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -45,6 +48,8 @@ public class TestUtil { public static final String STREAM_ARN = "arn:aws:dynamodb:us-east-1:123456789012:stream/2024-01-01T00:00:00Z"; public static final String SHARD_ID = "shardId-000000000002"; + public static final String CHILD_SHARD_ID_1 = "shardId-000000000003"; + public static final String CHILD_SHARD_ID_2 = "shardId-000000000004"; public static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = -1L; @@ -95,6 +100,15 @@ public class TestUtil { return getTestSplit(SHARD_ID); } + public static DynamoDbStreamsShardSplit getTestSplitWithChildShards() { + return getTestSplitWithChildShards(SHARD_ID); + } + + public static DynamoDbStreamsShardSplit getTestSplitWithChildShards(String shardId) { + return getTestSplit( + STREAM_ARN, SHARD_ID, Arrays.asList(CHILD_SHARD_ID_1, CHILD_SHARD_ID_2)); + } + public static DynamoDbStreamsShardSplit getTestSplit(String shardId) { return getTestSplit(STREAM_ARN, shardId); } @@ -104,6 +118,27 @@ public class TestUtil { streamArn, shardId, StartingPosition.fromStart(), null); } + public static DynamoDbStreamsShardSplit getTestSplit( + String streamArn, String shardId, List<String> childShardIds) { + return new DynamoDbStreamsShardSplit( + streamArn, + shardId, + StartingPosition.fromStart(), + null, + childShardIds.stream() + .map( + childShardId -> + Shard.builder() + .parentShardId(shardId) + .shardId(childShardId) + .sequenceNumberRange( + SequenceNumberRange.builder() + .startingSequenceNumber("1234") + .build()) + .build()) + .collect(Collectors.toList())); + } + public static DynamoDbStreamsShardSplit getTestSplit(StartingPosition startingPosition) { return new DynamoDbStreamsShardSplit(STREAM_ARN, SHARD_ID, startingPosition, null); } diff --git a/pom.xml b/pom.xml index ccb262e..167b40d 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ under the License. </scm> <properties> - <aws.sdkv2.version>2.26.19</aws.sdkv2.version> + <aws.sdkv2.version>2.40.3</aws.sdkv2.version> <netty.version>4.1.86.Final</netty.version> <flink.version>2.0.0</flink.version> <jackson-bom.version>2.14.3</jackson-bom.version>
