This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d6746b  [FLINK-36947][Connectors/Kinesis] Fix issue where excessive 
GetRecords in PollingKinesisShardSplitReader calls are made on idle source 
causing throttling [FLINK-36939][Connectors/Kinesis] Fix issue where excessive 
BlockingQueue.poll() in FanOutKinesisShardSplitReader are made on idle source 
causing high CPU utilisation
9d6746b is described below

commit 9d6746b2e45fb836d0e6f90ae85f33c493952e2b
Author: Keith Lee <[email protected]>
AuthorDate: Thu Apr 3 15:08:41 2025 +0100

    [FLINK-36947][Connectors/Kinesis] Fix issue where excessive GetRecords in 
PollingKinesisShardSplitReader calls are made on idle source causing throttling
    [FLINK-36939][Connectors/Kinesis] Fix issue where excessive 
BlockingQueue.poll() in FanOutKinesisShardSplitReader are made on idle source 
causing high CPU utilisation
---
 .../flink-connector-aws-kinesis-streams/pom.xml    |   6 +
 .../kinesis/source/KinesisStreamsSource.java       |   3 +-
 .../source/config/KinesisSourceConfigOptions.java  |   7 +
 .../source/reader/KinesisShardSplitReaderBase.java |  83 +++++++-
 .../fanout/FanOutKinesisShardSplitReader.java      |   9 +-
 .../polling/PollingKinesisShardSplitReader.java    |   2 +-
 .../reader/KinesisShardSplitReaderBaseTest.java    | 229 +++++++++++++++++++++
 .../reader/PollingKinesisShardSplitReaderTest.java |  15 +-
 .../fanout/FanOutKinesisShardSplitReaderTest.java  |  19 +-
 .../source/util/KinesisStreamProxyProvider.java    |   7 +
 .../flink-connector-dynamodb/pom.xml               |   1 -
 pom.xml                                            |   6 +
 12 files changed, 364 insertions(+), 23 deletions(-)

diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
index 3ba20c6..836d334 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml
@@ -153,6 +153,12 @@ under the License.
             <artifactId>flink-architecture-tests-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 5fba71c..e07add1 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -81,7 +81,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
-import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION;
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION;
@@ -228,7 +227,7 @@ public class KinesisStreamsSource<T>
                                 createKinesisAsyncStreamProxy(streamArn, 
sourceConfig),
                                 consumerArn,
                                 shardMetricGroupMap,
-                                
sourceConfig.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT));
+                                sourceConfig);
             default:
                 throw new IllegalArgumentException("Unsupported reader type: " 
+ readerType);
         }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
index 92e1889..c310d7c 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java
@@ -85,6 +85,13 @@ public class KinesisSourceConfigOptions {
                     .defaultValue(ReaderType.POLLING)
                     .withDescription("The type of reader used to read from the 
Kinesis stream.");
 
+    public static final ConfigOption<Duration> 
READER_EMPTY_RECORDS_FETCH_INTERVAL =
+            ConfigOptions.key("source.reader.empty-records-fetch-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(250))
+                    .withDescription(
+                            "The interval in milliseconds between fetches with 
empty records");
+
     public static final ConfigOption<ConsumerLifecycle> EFO_CONSUMER_LIFECYCLE 
=
             ConfigOptions.key("source.efo.lifecycle")
                     .enumType(ConsumerLifecycle.class)
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
index 3de9448..e96a910 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java
@@ -19,9 +19,11 @@
 package org.apache.flink.connector.kinesis.source.reader;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
 import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
@@ -38,12 +40,14 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.WeakHashMap;
 
 import static java.util.Collections.singleton;
 
@@ -60,8 +64,17 @@ public abstract class KinesisShardSplitReaderBase
     private final Set<String> pausedSplitIds = new HashSet<>();
     private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
 
-    protected KinesisShardSplitReaderBase(Map<String, KinesisShardMetrics> 
shardMetricGroupMap) {
+    private final long emptyRecordsIntervalMillis;
+
+    private final Map<KinesisShardSplitState, Long> scheduledFetchTimes = new 
WeakHashMap<>();
+
+    protected KinesisShardSplitReaderBase(
+            Map<String, KinesisShardMetrics> shardMetricGroupMap, 
Configuration configuration) {
         this.shardMetricGroupMap = shardMetricGroupMap;
+        this.emptyRecordsIntervalMillis =
+                configuration
+                        
.get(KinesisSourceConfigOptions.READER_EMPTY_RECORDS_FETCH_INTERVAL)
+                        .toMillis();
     }
 
     @Override
@@ -69,7 +82,12 @@ public abstract class KinesisShardSplitReaderBase
         KinesisShardSplitState splitState = assignedSplits.poll();
 
         // When there are no assigned splits, return quickly
-        if (splitState == null) {
+        if (skipWhenNoAssignedSplit(splitState)) {
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
+        }
+
+        if (skipUntilScheduledFetchTime(splitState)) {
+            assignedSplits.add(splitState);
             return INCOMPLETE_SHARD_EMPTY_RECORDS;
         }
 
@@ -82,6 +100,7 @@ public abstract class KinesisShardSplitReaderBase
         RecordBatch recordBatch;
         try {
             recordBatch = fetchRecords(splitState);
+            scheduleNextFetchTime(splitState, recordBatch);
         } catch (ResourceNotFoundException e) {
             LOG.warn(
                     "Failed to fetch records from shard {}: shard no longer 
exists. Marking split as complete",
@@ -125,6 +144,66 @@ public abstract class KinesisShardSplitReaderBase
                 recordBatch.isCompleted());
     }
 
+    private boolean skipWhenNoAssignedSplit(KinesisShardSplitState splitState) 
throws IOException {
+        if (splitState == null) {
+            try {
+                // Small sleep to prevent busy polling
+                Thread.sleep(1);
+                return true;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Sleep was interrupted while skipping no 
assigned split", e);
+            }
+        }
+
+        return false;
+    }
+
+    private boolean skipUntilScheduledFetchTime(KinesisShardSplitState 
splitState)
+            throws IOException {
+        if (scheduledFetchTimes.containsKey(splitState)
+                && scheduledFetchTimes.get(splitState) > 
System.currentTimeMillis()) {
+            try {
+                // Small sleep to prevent busy polling
+                Thread.sleep(1);
+                return true;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(
+                        "Sleep was interrupted while skipping until scheduled 
fetch record time",
+                        e);
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Schedules next fetch time, to be called immediately on the result of a 
fetchRecords() call.
+     *
+     * <p>If recordBatch does not contain records, next fetchRecords() is 
scheduled. Before
+     * scheduled time, fetcher thread will skip fetching (and have small 
sleep) for the split.
+     *
+     * <p>If recordBatch is not empty, next fetchRecords() time is not 
scheduled resulting in next
+     * fetch on the split is performed at first opportunity.
+     *
+     * @param splitState splitState on which the fetchRecords() was called on
+     * @param recordBatch recordBatch returned by fetchRecords()
+     */
+    private void scheduleNextFetchTime(KinesisShardSplitState splitState, 
RecordBatch recordBatch) {
+        if (recordBatch == null || recordBatch.getRecords().isEmpty()) {
+            long scheduledGetRecordTimeMillis =
+                    System.currentTimeMillis() + emptyRecordsIntervalMillis;
+            this.scheduledFetchTimes.put(splitState, 
scheduledGetRecordTimeMillis);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Fetched zero records from split {}, scheduling next 
fetch to {}",
+                        splitState.getSplitId(),
+                        new Date(scheduledGetRecordTimeMillis).toInstant());
+            }
+        }
+    }
+
     /**
      * Main method implementations must implement to fetch records from 
Kinesis.
      *
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
index 8ae2029..c0aefee 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.kinesis.source.reader.fanout;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
@@ -32,6 +33,8 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
+
 /**
  * An implementation of the KinesisShardSplitReader that consumes from Kinesis 
using Enhanced
  * Fan-Out and HTTP/2.
@@ -48,11 +51,11 @@ public class FanOutKinesisShardSplitReader extends 
KinesisShardSplitReaderBase {
             AsyncStreamProxy asyncStreamProxy,
             String consumerArn,
             Map<String, KinesisShardMetrics> shardMetricGroupMap,
-            Duration subscriptionTimeout) {
-        super(shardMetricGroupMap);
+            Configuration configuration) {
+        super(shardMetricGroupMap, configuration);
         this.asyncStreamProxy = asyncStreamProxy;
         this.consumerArn = consumerArn;
-        this.subscriptionTimeout = subscriptionTimeout;
+        this.subscriptionTimeout = 
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
     }
 
     @Override
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
index d2d6cd5..c3dca83 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
@@ -44,7 +44,7 @@ public class PollingKinesisShardSplitReader extends 
KinesisShardSplitReaderBase
             StreamProxy kinesisProxy,
             Map<String, KinesisShardMetrics> shardMetricGroupMap,
             Configuration configuration) {
-        super(shardMetricGroupMap);
+        super(shardMetricGroupMap, configuration);
         this.kinesis = kinesisProxy;
         this.configuration = configuration;
         this.maxRecordsToGet = 
configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX);
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
new file mode 100644
index 0000000..bc8b1b5
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
@@ -0,0 +1,229 @@
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_EMPTY_RECORDS_FETCH_INTERVAL;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.await;
+
+class KinesisShardSplitReaderBaseTest {
+
+    private static Configuration newConfigurationForTest() {
+        return new Configuration().set(SHARD_GET_RECORDS_MAX, 50);
+    }
+
+    private static Stream<Arguments> readerTypeAndInterval() {
+        return Stream.of(
+                Arguments.of(NullReturningReader.class, 0L),
+                Arguments.of(NullReturningReader.class, 250L),
+                Arguments.of(NullReturningReader.class, 1000L),
+                Arguments.of(EmptyRecordReturningReader.class, 0L),
+                Arguments.of(EmptyRecordReturningReader.class, 250L),
+                Arguments.of(EmptyRecordReturningReader.class, 1000L));
+    }
+
+    @MethodSource("readerTypeAndInterval")
+    @ParameterizedTest
+    public void testGetRecordsIntervalForIdleSource(
+            Class<? extends CountingReader> readerClass, long interval) {
+        Configuration configuration = newConfigurationForTest();
+        configuration.set(READER_EMPTY_RECORDS_FETCH_INTERVAL, 
Duration.ofMillis(interval));
+
+        // Given reader with custom interval
+        List<KinesisShardSplit> shardSplits = createShardSplits(8);
+        Map<String, KinesisShardMetrics> metrics = 
getShardMetrics(shardSplits);
+        CountingReader reader = buildReader(readerClass, configuration, 
metrics);
+
+        reader.handleSplitsChanges(new SplitsAddition<>(shardSplits));
+
+        // When (empty) records are fetched continuously
+        await().pollInSameThread()
+                .pollInterval(Duration.ofMillis(1))
+                .atMost(interval + 1000L, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            reader.fetch();
+
+                            // Then call fetch record at intervals
+                            for (List<Long> fetchRecordsCallTimes :
+                                    
reader.getFetchRecordsCallTimestamps().values()) {
+                                
assertThat(fetchRecordsCallTimes.size()).isEqualTo(2);
+
+                                // Ensure interval between fetchRecord calls 
is between configured
+                                // interval and interval + 250 milliseconds 
(allowing 250
+                                // milliseconds of leeway)
+                                assertThat(
+                                                fetchRecordsCallTimes.get(1)
+                                                        - 
fetchRecordsCallTimes.get(0))
+                                        .isBetween(interval, interval + 250L);
+                            }
+
+                            
assertThat(reader.getFetchRecordsCallTimestamps().size()).isEqualTo(8);
+                        });
+    }
+
+    private static Stream<Arguments> readerTypeAndShardCount() {
+        return Stream.of(
+                Arguments.of(NullReturningReader.class, 1),
+                Arguments.of(NullReturningReader.class, 8),
+                Arguments.of(NullReturningReader.class, 64),
+                Arguments.of(EmptyRecordReturningReader.class, 1),
+                Arguments.of(EmptyRecordReturningReader.class, 8),
+                Arguments.of(EmptyRecordReturningReader.class, 64));
+    }
+
+    @MethodSource("readerTypeAndShardCount")
+    @ParameterizedTest
+    public void testFetchRecordsIntervalForMultipleIdleSource(
+            Class<? extends CountingReader> readerClass, int shardCount) {
+        // Given reader with shard count
+        List<KinesisShardSplit> shardSplits = createShardSplits(shardCount);
+        Map<String, KinesisShardMetrics> metrics = 
getShardMetrics(shardSplits);
+        CountingReader reader = buildReader(readerClass, 
newConfigurationForTest(), metrics);
+
+        reader.handleSplitsChanges(new SplitsAddition<>(shardSplits));
+
+        // When (empty) records are fetched continuously
+        await().pollInSameThread()
+                .pollInterval(Duration.ofMillis(1))
+                .atMost(250L + 1000L, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            reader.fetch();
+
+                            // Then call fetch record at intervals
+                            for (List<Long> fetchRecordCallTime :
+                                    
reader.getFetchRecordsCallTimestamps().values()) {
+                                
assertThat(fetchRecordCallTime.size()).isEqualTo(2);
+
+                                // Default READER_EMPTY_RECORDS_FETCH_INTERVAL 
value at 250
+                                // Ensure interval between fetchRecord calls 
is between 250 and 500
+                                // milliseconds (allowing 250 milliseconds of 
leeway)
+                                assertThat(fetchRecordCallTime.get(1) - 
fetchRecordCallTime.get(0))
+                                        .isBetween(250L, 250L + 250L);
+                            }
+
+                            
assertThat(reader.getFetchRecordsCallTimestamps().size())
+                                    .isEqualTo(shardCount);
+                        });
+    }
+
+    private static CountingReader buildReader(
+            Class<? extends CountingReader> readerClass,
+            Configuration configuration,
+            Map<String, KinesisShardMetrics> metrics) {
+        if (readerClass == NullReturningReader.class) {
+            return new NullReturningReader(metrics, configuration);
+        } else if (readerClass == EmptyRecordReturningReader.class) {
+            return new EmptyRecordReturningReader(metrics, configuration);
+        }
+
+        throw new RuntimeException(
+                "No test implementation found for " + 
readerClass.getCanonicalName());
+    }
+
+    abstract static class CountingReader extends KinesisShardSplitReaderBase {
+
+        private Map<KinesisShardSplitState, List<Long>> 
fetchRecordsCallTimestamps;
+
+        protected CountingReader(
+                Map<String, KinesisShardMetrics> shardMetricGroupMap, 
Configuration configuration) {
+            super(shardMetricGroupMap, configuration);
+            fetchRecordsCallTimestamps = new HashMap<>();
+        }
+
+        @Override
+        protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+            recordFetchTimestamp(splitState);
+
+            return null;
+        }
+
+        private void recordFetchTimestamp(KinesisShardSplitState splitState) {
+            if (fetchRecordsCallTimestamps.containsKey(splitState)) {
+                
fetchRecordsCallTimestamps.get(splitState).add(System.currentTimeMillis());
+            } else {
+                ArrayList<Long> fetchRecordCallTime = new ArrayList<>();
+                fetchRecordCallTime.add(System.currentTimeMillis());
+                fetchRecordsCallTimestamps.put(splitState, 
fetchRecordCallTime);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {}
+
+        public Map<KinesisShardSplitState, List<Long>> 
getFetchRecordsCallTimestamps() {
+            return fetchRecordsCallTimestamps;
+        }
+    }
+
+    static class NullReturningReader extends CountingReader {
+        public NullReturningReader(
+                Map<String, KinesisShardMetrics> shardMetricGroupMap, 
Configuration configuration) {
+            super(shardMetricGroupMap, configuration);
+        }
+
+        @Override
+        protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+            super.fetchRecords(splitState);
+            return null;
+        }
+    }
+
+    static class EmptyRecordReturningReader extends CountingReader {
+        public EmptyRecordReturningReader(
+                Map<String, KinesisShardMetrics> shardMetricGroupMap, 
Configuration configuration) {
+            super(shardMetricGroupMap, configuration);
+        }
+
+        @Override
+        protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
+            super.fetchRecords(splitState);
+            return new RecordBatch(Collections.emptyList(), 0L, false);
+        }
+    }
+
+    private static List<KinesisShardSplit> createShardSplits(int shardCount) {
+        return IntStream.range(0, shardCount)
+                .mapToObj(shardId -> getTestSplit(generateShardId(shardId)))
+                .collect(Collectors.toList());
+    }
+
+    private static Map<String, KinesisShardMetrics> getShardMetrics(
+            List<KinesisShardSplit> shardSplits) {
+        Map<String, KinesisShardMetrics> metrics = new HashMap<>();
+        MetricListener metricListener = new MetricListener();
+
+        shardSplits.forEach(
+                shardSplit ->
+                        metrics.put(
+                                shardSplit.splitId(),
+                                new KinesisShardMetrics(
+                                        shardSplit, 
metricListener.getMetricGroup())));
+
+        return metrics;
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
index 2308b67..e19cc70 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
@@ -57,25 +57,25 @@ class PollingKinesisShardSplitReaderTest {
     private TestKinesisStreamProxy testStreamProxy;
     private MetricListener metricListener;
     private Map<String, KinesisShardMetrics> shardMetricGroupMap;
-    private Configuration sourceConfig;
     private static final String TEST_SHARD_ID = TestUtil.generateShardId(1);
 
+    private Configuration newConfigurationForTest() {
+        return new Configuration().set(SHARD_GET_RECORDS_MAX, 50);
+    }
+
     @BeforeEach
     public void init() {
         testStreamProxy = getTestStreamProxy();
         metricListener = new MetricListener();
         shardMetricGroupMap = new ConcurrentHashMap<>();
 
-        sourceConfig = new Configuration();
-        sourceConfig.set(SHARD_GET_RECORDS_MAX, 50);
-
         shardMetricGroupMap.put(
                 TEST_SHARD_ID,
                 new KinesisShardMetrics(
                         TestUtil.getTestSplit(TEST_SHARD_ID), 
metricListener.getMetricGroup()));
         splitReader =
                 new PollingKinesisShardSplitReader(
-                        testStreamProxy, shardMetricGroupMap, sourceConfig);
+                        testStreamProxy, shardMetricGroupMap, 
newConfigurationForTest());
     }
 
     @Test
@@ -374,10 +374,11 @@ class PollingKinesisShardSplitReaderTest {
     @Test
     void testMaxRecordsToGetParameterPassed() throws IOException {
         int maxRecordsToGet = 2;
-        sourceConfig.set(SHARD_GET_RECORDS_MAX, maxRecordsToGet);
+        Configuration configuration = newConfigurationForTest();
+        configuration.set(SHARD_GET_RECORDS_MAX, maxRecordsToGet);
         splitReader =
                 new PollingKinesisShardSplitReader(
-                        testStreamProxy, shardMetricGroupMap, sourceConfig);
+                        testStreamProxy, shardMetricGroupMap, configuration);
         testStreamProxy.addShards(TEST_SHARD_ID);
         List<Record> sentRecords =
                 Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), 
getTestRecord("data-3"))
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
index e065d4d..fbaaf69 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kinesis.source.reader.fanout;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
@@ -49,14 +51,17 @@ import static 
org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 /** Test for {@link FanOutKinesisShardSplitReader}. */
 public class FanOutKinesisShardSplitReaderTest {
     private static final String TEST_SHARD_ID = TestUtil.generateShardId(1);
-    private static final Duration TEST_SUBSCRIPTION_TIMEOUT = 
Duration.ofMillis(1000);
 
-    SplitReader<Record, KinesisShardSplit> splitReader;
+    FanOutKinesisShardSplitReader splitReader;
 
     private AsyncStreamProxy testAsyncStreamProxy;
     private Map<String, KinesisShardMetrics> shardMetricGroupMap;
     private MetricListener metricListener;
 
+    private Configuration newConfigurationForTest() {
+        return new Configuration().set(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT, 
Duration.ofMillis(1000));
+    }
+
     @BeforeEach
     public void init() {
         metricListener = new MetricListener();
@@ -77,7 +82,7 @@ public class FanOutKinesisShardSplitReaderTest {
                         testAsyncStreamProxy,
                         CONSUMER_ARN,
                         shardMetricGroupMap,
-                        TEST_SUBSCRIPTION_TIMEOUT);
+                        newConfigurationForTest());
         RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch();
 
         assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -94,7 +99,7 @@ public class FanOutKinesisShardSplitReaderTest {
                         testAsyncStreamProxy,
                         CONSUMER_ARN,
                         shardMetricGroupMap,
-                        TEST_SUBSCRIPTION_TIMEOUT);
+                        newConfigurationForTest());
         splitReader.handleSplitsChanges(
                 new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
 
@@ -117,7 +122,7 @@ public class FanOutKinesisShardSplitReaderTest {
                         testAsyncStreamProxy,
                         CONSUMER_ARN,
                         shardMetricGroupMap,
-                        TEST_SUBSCRIPTION_TIMEOUT);
+                        newConfigurationForTest());
         splitReader.handleSplitsChanges(
                 new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
 
@@ -138,7 +143,7 @@ public class FanOutKinesisShardSplitReaderTest {
                         testAsyncStreamProxy,
                         CONSUMER_ARN,
                         shardMetricGroupMap,
-                        TEST_SUBSCRIPTION_TIMEOUT);
+                        newConfigurationForTest());
 
         // When wakeup is called
         // Then no exception is thrown and no-op
@@ -155,7 +160,7 @@ public class FanOutKinesisShardSplitReaderTest {
                         trackCloseStreamProxy,
                         CONSUMER_ARN,
                         shardMetricGroupMap,
-                        TEST_SUBSCRIPTION_TIMEOUT);
+                        newConfigurationForTest());
 
         // When split reader is not closed
         // Then stream proxy is still open
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
index 5b79430..863b855 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
@@ -94,6 +94,8 @@ public class KinesisStreamProxyProvider {
         private final Map<String, Set<String>> efoConsumerRegistration = new 
HashMap<>();
         private final Set<String> consumersCurrentlyDeleting = new HashSet<>();
 
+        private int totalGetRecordsCall = 0;
+
         @Override
         public StreamDescriptionSummary getStreamDescriptionSummary(String 
streamArn) {
             return StreamDescriptionSummary.builder()
@@ -137,6 +139,7 @@ public class KinesisStreamProxyProvider {
                 String shardId,
                 StartingPosition startingPosition,
                 int maxRecordsToGet) {
+            totalGetRecordsCall++;
             ShardHandle shardHandle = new ShardHandle(streamArn, shardId);
 
             if (getRecordsExceptionSupplier != null) {
@@ -350,6 +353,10 @@ public class KinesisStreamProxyProvider {
             return closed;
         }
 
+        public int getTotalGetRecordsCall() {
+            return totalGetRecordsCall;
+        }
+
         private static class ShardHandle {
             private final String streamArn;
             private final String shardId;
diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml 
b/flink-connector-aws/flink-connector-dynamodb/pom.xml
index 3cbb7e0..06ae5cd 100644
--- a/flink-connector-aws/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml
@@ -154,7 +154,6 @@ under the License.
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
-            <version>4.3.0</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index d52f695..5f7c556 100644
--- a/pom.xml
+++ b/pom.xml
@@ -412,6 +412,12 @@ under the License.
                 <artifactId>annotations</artifactId>
                 <version>17.0.0</version>
             </dependency>
+            <dependency>
+                <groupId>org.awaitility</groupId>
+                <artifactId>awaitility</artifactId>
+                <scope>test</scope>
+                <version>4.3.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


Reply via email to