This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9d6746b [FLINK-36947][Connectors/Kinesis] Fix issue where excessive
GetRecords in PollingKinesisShardSplitReader calls are made on idle source
causing throttling [FLINK-36939][Connectors/Kinesis] Fix issue where excessive
BlockingQueue.poll() in FanOutKinesisShardSplitReader are made on idle source
causing high CPU utilisation
9d6746b is described below
commit 9d6746b2e45fb836d0e6f90ae85f33c493952e2b
Author: Keith Lee <[email protected]>
AuthorDate: Thu Apr 3 15:08:41 2025 +0100
[FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords in
PollingKinesisShardSplitReader calls are made on idle source causing throttling
[FLINK-36939][Connectors/Kinesis] Fix issue where excessive
BlockingQueue.poll() in FanOutKinesisShardSplitReader are made on idle source
causing high CPU utilisation
---
.../flink-connector-aws-kinesis-streams/pom.xml | 6 +
.../kinesis/source/KinesisStreamsSource.java | 3 +-
.../source/config/KinesisSourceConfigOptions.java | 7 +
.../source/reader/KinesisShardSplitReaderBase.java | 83 +++++++-
.../fanout/FanOutKinesisShardSplitReader.java | 9 +-
.../polling/PollingKinesisShardSplitReader.java | 2 +-
.../reader/KinesisShardSplitReaderBaseTest.java | 229 +++++++++++++++++++++
.../reader/PollingKinesisShardSplitReaderTest.java | 15 +-
.../fanout/FanOutKinesisShardSplitReaderTest.java | 19 +-
.../source/util/KinesisStreamProxyProvider.java | 7 +
.../flink-connector-dynamodb/pom.xml | 1 -
pom.xml | 6 +
12 files changed, 364 insertions(+), 23 deletions(-)
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
index 3ba20c6..836d334 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
@@ -153,6 +153,12 @@ under the License.
<artifactId>flink-architecture-tests-test</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 5fba71c..e07add1 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -81,7 +81,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
-import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION;
@@ -228,7 +227,7 @@ public class KinesisStreamsSource<T>
createKinesisAsyncStreamProxy(streamArn,
sourceConfig),
consumerArn,
shardMetricGroupMap,
-
sourceConfig.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT));
+ sourceConfig);
default:
throw new IllegalArgumentException("Unsupported reader type: "
+ readerType);
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
index 92e1889..c310d7c 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
@@ -85,6 +85,13 @@ public class KinesisSourceConfigOptions {
.defaultValue(ReaderType.POLLING)
.withDescription("The type of reader used to read from the
Kinesis stream.");
+ public static final ConfigOption<Duration>
READER_EMPTY_RECORDS_FETCH_INTERVAL =
+ ConfigOptions.key("source.reader.empty-records-fetch-interval")
+ .durationType()
+ .defaultValue(Duration.ofMillis(250))
+ .withDescription(
+ "The interval in milliseconds between fetches with
empty records");
+
public static final ConfigOption<ConsumerLifecycle> EFO_CONSUMER_LIFECYCLE
=
ConfigOptions.key("source.efo.lifecycle")
.enumType(ConsumerLifecycle.class)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
index 3de9448..e96a910 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
@@ -19,9 +19,11 @@
package org.apache.flink.connector.kinesis.source.reader;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
@@ -38,12 +40,14 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.WeakHashMap;
import static java.util.Collections.singleton;
@@ -60,8 +64,17 @@ public abstract class KinesisShardSplitReaderBase
private final Set<String> pausedSplitIds = new HashSet<>();
private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
- protected KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics>
shardMetricGroupMap) {
+ private final long emptyRecordsIntervalMillis;
+
+ private final Map<KinesisShardSplitState, Long> scheduledFetchTimes = new
WeakHashMap<>();
+
+ protected KinesisShardSplitReaderBase(
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.emptyRecordsIntervalMillis =
+ configuration
+
.get(KinesisSourceConfigOptions.READER_EMPTY_RECORDS_FETCH_INTERVAL)
+ .toMillis();
}
@Override
@@ -69,7 +82,12 @@ public abstract class KinesisShardSplitReaderBase
KinesisShardSplitState splitState = assignedSplits.poll();
// When there are no assigned splits, return quickly
- if (splitState == null) {
+ if (skipWhenNoAssignedSplit(splitState)) {
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
+ }
+
+ if (skipUntilScheduledFetchTime(splitState)) {
+ assignedSplits.add(splitState);
return INCOMPLETE_SHARD_EMPTY_RECORDS;
}
@@ -82,6 +100,7 @@ public abstract class KinesisShardSplitReaderBase
RecordBatch recordBatch;
try {
recordBatch = fetchRecords(splitState);
+ scheduleNextFetchTime(splitState, recordBatch);
} catch (ResourceNotFoundException e) {
LOG.warn(
"Failed to fetch records from shard {}: shard no longer
exists. Marking split as complete",
@@ -125,6 +144,66 @@ public abstract class KinesisShardSplitReaderBase
recordBatch.isCompleted());
}
+ private boolean skipWhenNoAssignedSplit(KinesisShardSplitState splitState)
throws IOException {
+ if (splitState == null) {
+ try {
+ // Small sleep to prevent busy polling
+ Thread.sleep(1);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Sleep was interrupted while skipping no
assigned split", e);
+ }
+ }
+
+ return false;
+ }
+
+ private boolean skipUntilScheduledFetchTime(KinesisShardSplitState
splitState)
+ throws IOException {
+ if (scheduledFetchTimes.containsKey(splitState)
+ && scheduledFetchTimes.get(splitState) >
System.currentTimeMillis()) {
+ try {
+ // Small sleep to prevent busy polling
+ Thread.sleep(1);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Sleep was interrupted while skipping until scheduled
fetch record time",
+ e);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Schedules next fetch time, to be called immediately on the result of a
fetchRecords() call.
+ *
+ * <p>If recordBatch does not contain records, next fetchRecords() is
scheduled. Before
+ * scheduled time, fetcher thread will skip fetching (and have small
sleep) for the split.
+ *
+ * <p>If recordBatch is not empty, next fetchRecords() time is not
scheduled resulting in next
+ * fetch on the split is performed at first opportunity.
+ *
+ * @param splitState splitState on which the fetchRecords() was called on
+ * @param recordBatch recordBatch returned by fetchRecords()
+ */
+ private void scheduleNextFetchTime(KinesisShardSplitState splitState,
RecordBatch recordBatch) {
+ if (recordBatch == null || recordBatch.getRecords().isEmpty()) {
+ long scheduledGetRecordTimeMillis =
+ System.currentTimeMillis() + emptyRecordsIntervalMillis;
+ this.scheduledFetchTimes.put(splitState,
scheduledGetRecordTimeMillis);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Fetched zero records from split {}, scheduling next
fetch to {}",
+ splitState.getSplitId(),
+ new Date(scheduledGetRecordTimeMillis).toInstant());
+ }
+ }
+ }
+
/**
* Main method implementations must implement to fetch records from
Kinesis.
*
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
index 8ae2029..c0aefee 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.kinesis.source.reader.fanout;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
@@ -32,6 +33,8 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
+
/**
* An implementation of the KinesisShardSplitReader that consumes from Kinesis
using Enhanced
* Fan-Out and HTTP/2.
@@ -48,11 +51,11 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
- Duration subscriptionTimeout) {
- super(shardMetricGroupMap);
+ Configuration configuration) {
+ super(shardMetricGroupMap, configuration);
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
- this.subscriptionTimeout = subscriptionTimeout;
+ this.subscriptionTimeout =
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
}
@Override
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
index d2d6cd5..c3dca83 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
@@ -44,7 +44,7 @@ public class PollingKinesisShardSplitReader extends
KinesisShardSplitReaderBase
StreamProxy kinesisProxy,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
- super(shardMetricGroupMap);
+ super(shardMetricGroupMap, configuration);
this.kinesis = kinesisProxy;
this.configuration = configuration;
this.maxRecordsToGet =
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
new file mode 100644
index 0000000..bc8b1b5
--- /dev/null
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
@@ -0,0 +1,229 @@
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_EMPTY_RECORDS_FETCH_INTERVAL;
+import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.await;
+
+class KinesisShardSplitReaderBaseTest {
+
+ private static Configuration newConfigurationForTest() {
+ return new Configuration().set(SHARD_GET_RECORDS_MAX, 50);
+ }
+
+ private static Stream<Arguments> readerTypeAndInterval() {
+ return Stream.of(
+ Arguments.of(NullReturningReader.class, 0L),
+ Arguments.of(NullReturningReader.class, 250L),
+ Arguments.of(NullReturningReader.class, 1000L),
+ Arguments.of(EmptyRecordReturningReader.class, 0L),
+ Arguments.of(EmptyRecordReturningReader.class, 250L),
+ Arguments.of(EmptyRecordReturningReader.class, 1000L));
+ }
+
+ @MethodSource("readerTypeAndInterval")
+ @ParameterizedTest
+ public void testGetRecordsIntervalForIdleSource(
+ Class<? extends CountingReader> readerClass, long interval) {
+ Configuration configuration = newConfigurationForTest();
+ configuration.set(READER_EMPTY_RECORDS_FETCH_INTERVAL,
Duration.ofMillis(interval));
+
+ // Given reader with custom interval
+ List<KinesisShardSplit> shardSplits = createShardSplits(8);
+ Map<String, KinesisShardMetrics> metrics =
getShardMetrics(shardSplits);
+ CountingReader reader = buildReader(readerClass, configuration,
metrics);
+
+ reader.handleSplitsChanges(new SplitsAddition<>(shardSplits));
+
+ // When (empty) records are fetched continuously
+ await().pollInSameThread()
+ .pollInterval(Duration.ofMillis(1))
+ .atMost(interval + 1000L, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ reader.fetch();
+
+ // Then call fetch record at intervals
+ for (List<Long> fetchRecordsCallTimes :
+
reader.getFetchRecordsCallTimestamps().values()) {
+
assertThat(fetchRecordsCallTimes.size()).isEqualTo(2);
+
+ // Ensure interval between fetchRecord calls
is between configured
+ // interval and interval + 250 milliseconds
(allowing 250
+ // milliseconds of leeway)
+ assertThat(
+ fetchRecordsCallTimes.get(1)
+ -
fetchRecordsCallTimes.get(0))
+ .isBetween(interval, interval + 250L);
+ }
+
+
assertThat(reader.getFetchRecordsCallTimestamps().size()).isEqualTo(8);
+ });
+ }
+
+ private static Stream<Arguments> readerTypeAndShardCount() {
+ return Stream.of(
+ Arguments.of(NullReturningReader.class, 1),
+ Arguments.of(NullReturningReader.class, 8),
+ Arguments.of(NullReturningReader.class, 64),
+ Arguments.of(EmptyRecordReturningReader.class, 1),
+ Arguments.of(EmptyRecordReturningReader.class, 8),
+ Arguments.of(EmptyRecordReturningReader.class, 64));
+ }
+
+ @MethodSource("readerTypeAndShardCount")
+ @ParameterizedTest
+ public void testFetchRecordsIntervalForMultipleIdleSource(
+ Class<? extends CountingReader> readerClass, int shardCount) {
+ // Given reader with shard count
+ List<KinesisShardSplit> shardSplits = createShardSplits(shardCount);
+ Map<String, KinesisShardMetrics> metrics =
getShardMetrics(shardSplits);
+ CountingReader reader = buildReader(readerClass,
newConfigurationForTest(), metrics);
+
+ reader.handleSplitsChanges(new SplitsAddition<>(shardSplits));
+
+ // When (empty) records are fetched continuously
+ await().pollInSameThread()
+ .pollInterval(Duration.ofMillis(1))
+ .atMost(250L + 1000L, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ reader.fetch();
+
+ // Then call fetch record at intervals
+ for (List<Long> fetchRecordCallTime :
+
reader.getFetchRecordsCallTimestamps().values()) {
+
assertThat(fetchRecordCallTime.size()).isEqualTo(2);
+
+ // Default READER_EMPTY_RECORDS_FETCH_INTERVAL
value at 250
+ // Ensure interval between fetchRecord calls
is between 250 and 500
+ // milliseconds (allowing 250 milliseconds of
leeway)
+ assertThat(fetchRecordCallTime.get(1) -
fetchRecordCallTime.get(0))
+ .isBetween(250L, 250L + 250L);
+ }
+
+
assertThat(reader.getFetchRecordsCallTimestamps().size())
+ .isEqualTo(shardCount);
+ });
+ }
+
+ private static CountingReader buildReader(
+ Class<? extends CountingReader> readerClass,
+ Configuration configuration,
+ Map<String, KinesisShardMetrics> metrics) {
+ if (readerClass == NullReturningReader.class) {
+ return new NullReturningReader(metrics, configuration);
+ } else if (readerClass == EmptyRecordReturningReader.class) {
+ return new EmptyRecordReturningReader(metrics, configuration);
+ }
+
+ throw new RuntimeException(
+ "No test implementation found for " +
readerClass.getCanonicalName());
+ }
+
+ abstract static class CountingReader extends KinesisShardSplitReaderBase {
+
+ private Map<KinesisShardSplitState, List<Long>>
fetchRecordsCallTimestamps;
+
+ protected CountingReader(
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
+ super(shardMetricGroupMap, configuration);
+ fetchRecordsCallTimestamps = new HashMap<>();
+ }
+
+ @Override
+ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+ recordFetchTimestamp(splitState);
+
+ return null;
+ }
+
+ private void recordFetchTimestamp(KinesisShardSplitState splitState) {
+ if (fetchRecordsCallTimestamps.containsKey(splitState)) {
+
fetchRecordsCallTimestamps.get(splitState).add(System.currentTimeMillis());
+ } else {
+ ArrayList<Long> fetchRecordCallTime = new ArrayList<>();
+ fetchRecordCallTime.add(System.currentTimeMillis());
+ fetchRecordsCallTimestamps.put(splitState,
fetchRecordCallTime);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ public Map<KinesisShardSplitState, List<Long>>
getFetchRecordsCallTimestamps() {
+ return fetchRecordsCallTimestamps;
+ }
+ }
+
+ static class NullReturningReader extends CountingReader {
+ public NullReturningReader(
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
+ super(shardMetricGroupMap, configuration);
+ }
+
+ @Override
+ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+ super.fetchRecords(splitState);
+ return null;
+ }
+ }
+
+ static class EmptyRecordReturningReader extends CountingReader {
+ public EmptyRecordReturningReader(
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
+ super(shardMetricGroupMap, configuration);
+ }
+
+ @Override
+ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+ super.fetchRecords(splitState);
+ return new RecordBatch(Collections.emptyList(), 0L, false);
+ }
+ }
+
+ private static List<KinesisShardSplit> createShardSplits(int shardCount) {
+ return IntStream.range(0, shardCount)
+ .mapToObj(shardId -> getTestSplit(generateShardId(shardId)))
+ .collect(Collectors.toList());
+ }
+
+ private static Map<String, KinesisShardMetrics> getShardMetrics(
+ List<KinesisShardSplit> shardSplits) {
+ Map<String, KinesisShardMetrics> metrics = new HashMap<>();
+ MetricListener metricListener = new MetricListener();
+
+ shardSplits.forEach(
+ shardSplit ->
+ metrics.put(
+ shardSplit.splitId(),
+ new KinesisShardMetrics(
+ shardSplit,
metricListener.getMetricGroup())));
+
+ return metrics;
+ }
+}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
index 2308b67..e19cc70 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
@@ -57,25 +57,25 @@ class PollingKinesisShardSplitReaderTest {
private TestKinesisStreamProxy testStreamProxy;
private MetricListener metricListener;
private Map<String, KinesisShardMetrics> shardMetricGroupMap;
- private Configuration sourceConfig;
private static final String TEST_SHARD_ID = TestUtil.generateShardId(1);
+ private Configuration newConfigurationForTest() {
+ return new Configuration().set(SHARD_GET_RECORDS_MAX, 50);
+ }
+
@BeforeEach
public void init() {
testStreamProxy = getTestStreamProxy();
metricListener = new MetricListener();
shardMetricGroupMap = new ConcurrentHashMap<>();
- sourceConfig = new Configuration();
- sourceConfig.set(SHARD_GET_RECORDS_MAX, 50);
-
shardMetricGroupMap.put(
TEST_SHARD_ID,
new KinesisShardMetrics(
TestUtil.getTestSplit(TEST_SHARD_ID),
metricListener.getMetricGroup()));
splitReader =
new PollingKinesisShardSplitReader(
- testStreamProxy, shardMetricGroupMap, sourceConfig);
+ testStreamProxy, shardMetricGroupMap,
newConfigurationForTest());
}
@Test
@@ -374,10 +374,11 @@ class PollingKinesisShardSplitReaderTest {
@Test
void testMaxRecordsToGetParameterPassed() throws IOException {
int maxRecordsToGet = 2;
- sourceConfig.set(SHARD_GET_RECORDS_MAX, maxRecordsToGet);
+ Configuration configuration = newConfigurationForTest();
+ configuration.set(SHARD_GET_RECORDS_MAX, maxRecordsToGet);
splitReader =
new PollingKinesisShardSplitReader(
- testStreamProxy, shardMetricGroupMap, sourceConfig);
+ testStreamProxy, shardMetricGroupMap, configuration);
testStreamProxy.addShards(TEST_SHARD_ID);
List<Record> sentRecords =
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"),
getTestRecord("data-3"))
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
index e065d4d..fbaaf69 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.kinesis.source.reader.fanout;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
@@ -40,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
import static
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -49,14 +51,17 @@ import static
org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
/** Test for {@link FanOutKinesisShardSplitReader}. */
public class FanOutKinesisShardSplitReaderTest {
private static final String TEST_SHARD_ID = TestUtil.generateShardId(1);
- private static final Duration TEST_SUBSCRIPTION_TIMEOUT =
Duration.ofMillis(1000);
- SplitReader<Record, KinesisShardSplit> splitReader;
+ FanOutKinesisShardSplitReader splitReader;
private AsyncStreamProxy testAsyncStreamProxy;
private Map<String, KinesisShardMetrics> shardMetricGroupMap;
private MetricListener metricListener;
+ private Configuration newConfigurationForTest() {
+ return new Configuration().set(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT,
Duration.ofMillis(1000));
+ }
+
@BeforeEach
public void init() {
metricListener = new MetricListener();
@@ -77,7 +82,7 @@ public class FanOutKinesisShardSplitReaderTest {
testAsyncStreamProxy,
CONSUMER_ARN,
shardMetricGroupMap,
- TEST_SUBSCRIPTION_TIMEOUT);
+ newConfigurationForTest());
RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch();
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -94,7 +99,7 @@ public class FanOutKinesisShardSplitReaderTest {
testAsyncStreamProxy,
CONSUMER_ARN,
shardMetricGroupMap,
- TEST_SUBSCRIPTION_TIMEOUT);
+ newConfigurationForTest());
splitReader.handleSplitsChanges(
new
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
@@ -117,7 +122,7 @@ public class FanOutKinesisShardSplitReaderTest {
testAsyncStreamProxy,
CONSUMER_ARN,
shardMetricGroupMap,
- TEST_SUBSCRIPTION_TIMEOUT);
+ newConfigurationForTest());
splitReader.handleSplitsChanges(
new
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
@@ -138,7 +143,7 @@ public class FanOutKinesisShardSplitReaderTest {
testAsyncStreamProxy,
CONSUMER_ARN,
shardMetricGroupMap,
- TEST_SUBSCRIPTION_TIMEOUT);
+ newConfigurationForTest());
// When wakeup is called
// Then no exception is thrown and no-op
@@ -155,7 +160,7 @@ public class FanOutKinesisShardSplitReaderTest {
trackCloseStreamProxy,
CONSUMER_ARN,
shardMetricGroupMap,
- TEST_SUBSCRIPTION_TIMEOUT);
+ newConfigurationForTest());
// When split reader is not closed
// Then stream proxy is still open
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
index 5b79430..863b855 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
@@ -94,6 +94,8 @@ public class KinesisStreamProxyProvider {
private final Map<String, Set<String>> efoConsumerRegistration = new
HashMap<>();
private final Set<String> consumersCurrentlyDeleting = new HashSet<>();
+ private int totalGetRecordsCall = 0;
+
@Override
public StreamDescriptionSummary getStreamDescriptionSummary(String
streamArn) {
return StreamDescriptionSummary.builder()
@@ -137,6 +139,7 @@ public class KinesisStreamProxyProvider {
String shardId,
StartingPosition startingPosition,
int maxRecordsToGet) {
+ totalGetRecordsCall++;
ShardHandle shardHandle = new ShardHandle(streamArn, shardId);
if (getRecordsExceptionSupplier != null) {
@@ -350,6 +353,10 @@ public class KinesisStreamProxyProvider {
return closed;
}
+ public int getTotalGetRecordsCall() {
+ return totalGetRecordsCall;
+ }
+
private static class ShardHandle {
private final String streamArn;
private final String shardId;
diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml
b/flink-connector-aws/flink-connector-dynamodb/pom.xml
index 3cbb7e0..06ae5cd 100644
--- a/flink-connector-aws/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml
@@ -154,7 +154,6 @@ under the License.
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
- <version>4.3.0</version>
<scope>test</scope>
</dependency>
diff --git a/pom.xml b/pom.xml
index d52f695..5f7c556 100644
--- a/pom.xml
+++ b/pom.xml
@@ -412,6 +412,12 @@ under the License.
<artifactId>annotations</artifactId>
<version>17.0.0</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ <version>4.3.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>