This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 5cc9710  [FLINK-23802][kinesis][efo] Request next record from the 
Flink source thread rather than the AWS SDK response thread. Increase Read 
Timeout to 6m, higher than the maximum Shard subscription duration (5m) and 
enable TCP keep alive.
5cc9710 is described below

commit 5cc97109f96a3aa908805d41ba3cee83168ee8e8
Author: Danny Cranmer <dannycran...@apache.org>
AuthorDate: Mon Aug 16 09:37:55 2021 +0100

    [FLINK-23802][kinesis][efo] Request next record from the Flink source 
thread rather than the AWS SDK response thread. Increase Read Timeout to 6m, 
higher than the maximum Shard subscription duration (5m) and enable TCP keep 
alive.
---
 .../kinesis/config/ConsumerConfigConstants.java    |   7 +-
 .../publisher/fanout/FanOutShardSubscriber.java    |  17 +--
 .../kinesis/proxy/KinesisProxyV2Factory.java       |   6 +
 .../connectors/kinesis/util/AwsV2Util.java         |  14 ++-
 .../fanout/FanOutRecordPublisherTest.java          |  18 ++-
 .../fanout/FanOutShardSubscriberTest.java          |   2 +-
 .../kinesis/proxy/KinesisProxyV2FactoryTest.java   |  88 +++++++++++++
 .../FakeKinesisFanOutBehavioursFactory.java        | 138 +++++++++++++++------
 .../connectors/kinesis/util/AwsV2UtilTest.java     |  33 ++++-
 9 files changed, 260 insertions(+), 63 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 4ea0bd8..128c173 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -312,6 +312,9 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
     public static final String EFO_HTTP_CLIENT_MAX_CONCURRENCY =
             "flink.stream.efo.http-client.max-concurrency";
 
+    public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS =
+            "flink.stream.efo.http-client.read-timeout";
+
     // ------------------------------------------------------------------------
     //  Default values for consumer configuration
     // ------------------------------------------------------------------------
@@ -403,7 +406,9 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
     public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000;
 
-    public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000;
+    public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY = 10_000;
+
+    public static final Duration DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT = 
Duration.ofMinutes(6);
 
     /**
      * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index be4df59..afda248 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -68,14 +68,13 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
  *   <li>{@link SubscriptionErrorEvent} - passes an exception from the network 
to the consumer
  * </ul>
  *
- * <p>The blocking queue has a maximum capacity of 1 record. This allows 
backpressure to be applied
- * closer to the network stack and results in record prefetch. At maximum 
capacity we will have
- * three {@link SubscribeToShardEvent} in memory (per instance of this class):
+ * <p>The blocking queue has a maximum capacity of two. One slot is used for a 
record batch, the
+ * remaining slot is reserved to completion events. At maximum capacity we 
will have two {@link
+ * SubscribeToShardEvent} in memory (per instance of this class):
  *
  * <ul>
  *   <li>1 event being processed by the consumer
  *   <li>1 event enqueued in the blocking queue
- *   <li>1 event being added to the queue by the network (blocking)
  * </ul>
  */
 @Internal
@@ -86,9 +85,11 @@ public class FanOutShardSubscriber {
     /**
      * The maximum capacity of the queue between the network and consumption 
thread. The queue is
      * mainly used to isolate networking from consumption such that errors do 
not bubble up. This
-     * queue also acts as a buffer resulting in a record prefetch and reduced 
latency.
+     * queue also acts as a buffer resulting in a record prefetch and reduced 
latency. Capacity is 2
+     * to allow 1 pending record batch and leave room for a completion event 
to avoid any writer
+     * thread blocking on the queue.
      */
-    private static final int QUEUE_CAPACITY = 1;
+    private static final int QUEUE_CAPACITY = 2;
 
     /**
      * Read timeout will occur after 30 seconds, a sanity timeout to prevent 
lockup in unexpected
@@ -350,6 +351,9 @@ public class FanOutShardSubscriber {
                 result = false;
                 break;
             } else if (subscriptionEvent.isSubscribeToShardEvent()) {
+                // Request for KDS to send the next record batch
+                subscription.requestRecord();
+
                 SubscribeToShardEvent event = 
subscriptionEvent.getSubscribeToShardEvent();
                 continuationSequenceNumber = 
event.continuationSequenceNumber();
                 if (!event.records().isEmpty()) {
@@ -412,7 +416,6 @@ public class FanOutShardSubscriber {
                         @Override
                         public void visit(SubscribeToShardEvent event) {
                             enqueueEvent(new SubscriptionNextEvent(event));
-                            requestRecord();
                         }
                     });
         }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
index 9458b2c..dcab485 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java
@@ -49,6 +49,8 @@ public class KinesisProxyV2Factory {
 
         final ClientConfiguration clientConfiguration =
                 new ClientConfigurationFactory().getConfig();
+        populateDefaultValues(clientConfiguration);
+
         final SdkAsyncHttpClient httpClient =
                 AwsV2Util.createHttpClient(
                         clientConfiguration, 
NettyNioAsyncHttpClient.builder(), configProps);
@@ -59,4 +61,8 @@ public class KinesisProxyV2Factory {
 
         return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
     }
+
+    private static void populateDefaultValues(final ClientConfiguration 
clientConfiguration) {
+        clientConfiguration.setUseTcpKeepAlive(true);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
index abb88d2..8d5621f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -53,10 +53,12 @@ import java.time.Duration;
 import java.util.Optional;
 import java.util.Properties;
 
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
@@ -100,11 +102,19 @@ public class AwsV2Util {
         int maxConcurrency =
                 
Optional.ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY))
                         .map(Integer::parseInt)
-                        .orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
+                        .orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY);
+
+        Duration readTimeout =
+                
Optional.ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+                        .map(Integer::parseInt)
+                        .map(Duration::ofMillis)
+                        .orElse(DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT);
 
         httpClientBuilder
                 .maxConcurrency(maxConcurrency)
                 
.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+                .readTimeout(readTimeout)
+                .tcpKeepAlive(config.useTcpKeepAlive())
                 .writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
                 
.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
                 .useIdleConnectionReaper(config.useReaper())
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
index 45cba55..7ce16d3 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
@@ -61,7 +61,6 @@ import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
-import static 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2.NUMBER_OF_SUBSCRIPTIONS;
 import static 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.emptyShard;
 import static 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.singletonShard;
 import static 
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
@@ -247,16 +246,15 @@ public class FanOutRecordPublisherTest {
         RecordPublisher recordPublisher = createRecordPublisher(kinesis);
         TestConsumer consumer = new TestConsumer();
 
-        int count = 0;
-        while (recordPublisher.run(consumer) == INCOMPLETE) {
-            if (++count > NUMBER_OF_SUBSCRIPTIONS + 1) {
-                break;
-            }
-        }
+        RecordPublisherRunResult result = recordPublisher.run(consumer);
+
+        // An exception is thrown after the 5th record in each subscription, 
therefore we expect to
+        // receive 5 records
+        assertEquals(5, consumer.getRecordBatches().size());
+        assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations());
 
-        // An exception is thrown on the 5th subscription and then the 
subscription completes on the
-        // next
-        assertEquals(NUMBER_OF_SUBSCRIPTIONS + 1, 
kinesis.getNumberOfSubscribeToShardInvocations());
+        // INCOMPLETE is returned to indicate the shard is not complete
+        assertEquals(INCOMPLETE, result);
     }
 
     @Test
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index c5962df..a41e702 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -141,7 +141,7 @@ public class FanOutShardSubscriberTest {
         thrown.expectMessage("Timed out enqueuing event 
SubscriptionNextEvent");
 
         KinesisProxyV2Interface kinesis =
-                
FakeKinesisFanOutBehavioursFactory.boundedShard().withBatchCount(5).build();
+                
FakeKinesisFanOutBehavioursFactory.shardThatCreatesBackpressureOnQueue();
 
         FanOutShardSubscriber subscriber =
                 new FanOutShardSubscriber(
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java
new file mode 100644
index 0000000..6095b20
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS;
+import static org.junit.Assert.assertEquals;
+
+/** Test for methods in the {@link KinesisProxyV2Factory} class. */
+public class KinesisProxyV2FactoryTest {
+
+    @Test
+    public void testReadTimeoutPopulatedFromDefaults() throws Exception {
+        Properties properties = properties();
+
+        KinesisProxyV2Interface proxy = 
KinesisProxyV2Factory.createKinesisProxyV2(properties);
+        NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy);
+
+        assertEquals(
+                DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT.toMillis(),
+                nettyConfiguration.readTimeoutMillis());
+    }
+
+    @Test
+    public void testReadTimeoutPopulatedFromProperties() throws Exception {
+        Properties properties = properties();
+        properties.setProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS, "12345");
+
+        KinesisProxyV2Interface proxy = 
KinesisProxyV2Factory.createKinesisProxyV2(properties);
+        NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy);
+
+        assertEquals(12345, nettyConfiguration.readTimeoutMillis());
+    }
+
+    @Test
+    public void testClientConfigurationPopulatedTcpKeepAliveDefaults() throws 
Exception {
+        Properties properties = properties();
+
+        KinesisProxyV2Interface proxy = 
KinesisProxyV2Factory.createKinesisProxyV2(properties);
+        NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy);
+
+        Assert.assertTrue(nettyConfiguration.tcpKeepAlive());
+    }
+
+    private NettyConfiguration getNettyConfiguration(final 
KinesisProxyV2Interface kinesis)
+            throws Exception {
+        NettyNioAsyncHttpClient httpClient = getField("httpClient", kinesis);
+        return getField("configuration", httpClient);
+    }
+
+    private <T> T getField(String fieldName, Object obj) throws Exception {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(obj);
+    }
+
+    private Properties properties() {
+        Properties properties = TestUtils.efoProperties();
+        properties.setProperty(AWSConfigConstants.AWS_REGION, "eu-west-2");
+        return properties;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
index 1107efb..88da61c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -43,15 +43,21 @@ import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHan
 
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static 
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE;
 import static 
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING;
@@ -100,6 +106,10 @@ public class FakeKinesisFanOutBehavioursFactory {
         return new FailsToAcquireSubscriptionKinesis();
     }
 
+    public static AbstractSingleShardFanOutKinesisV2 
shardThatCreatesBackpressureOnQueue() {
+        return new MultipleEventsForSingleRequest();
+    }
+
     // ------------------------------------------------------------------------
     //  Behaviours related to describing streams
     // ------------------------------------------------------------------------
@@ -140,20 +150,21 @@ public class FakeKinesisFanOutBehavioursFactory {
 
     public static AbstractSingleShardFanOutKinesisV2 
emptyBatchFollowedBySingleRecord() {
         return new AbstractSingleShardFanOutKinesisV2(2) {
-            private int subscription = 0;
+            private int subscriptionCount = 0;
 
             @Override
-            void sendEvents(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
+            List<SubscribeToShardEvent> getEventsToSend() {
                 SubscribeToShardEvent.Builder builder =
                         SubscribeToShardEvent.builder()
-                                .continuationSequenceNumber(subscription == 0 
? "1" : null);
+                                .continuationSequenceNumber(subscriptionCount 
== 0 ? "1" : null);
 
-                if (subscription == 1) {
+                if (subscriptionCount == 1) {
                     builder.records(createRecord(new AtomicInteger(1)));
                 }
 
-                subscriber.onNext(builder.build());
-                subscription++;
+                subscriptionCount++;
+
+                return Collections.singletonList(builder.build());
             }
         };
     }
@@ -171,15 +182,14 @@ public class FakeKinesisFanOutBehavioursFactory {
         }
 
         @Override
-        void sendEvents(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
-            if (index % 2 == 0) {
-                super.sendEvents(subscriber);
+        void completeSubscription(Subscriber<? super 
SubscribeToShardEventStream> subscriber) {
+            if (index++ % 2 == 0) {
+                // Fail the subscription
+                super.completeSubscription(subscriber);
             } else {
-                super.sendEventBatch(subscriber);
+                // Do not fail the subscription
                 subscriber.onComplete();
             }
-
-            index++;
         }
     }
 
@@ -203,28 +213,23 @@ public class FakeKinesisFanOutBehavioursFactory {
         }
 
         @Override
-        void sendEvents(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
-            sendEventBatch(subscriber);
+        List<SubscribeToShardEvent> getEventsToSend() {
+            return generateEvents(NUMBER_OF_EVENTS_PER_SUBSCRIPTION, 
sequenceNumber);
+        }
+
+        @Override
+        void completeSubscription(Subscriber<? super 
SubscribeToShardEventStream> subscriber) {
             try {
                 // Add an artificial delay to allow records to flush
                 Thread.sleep(200);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
+
             for (Throwable throwable : throwables) {
                 subscriber.onError(throwable);
             }
         }
-
-        void sendEventBatch(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
-            for (int i = 0; i < NUMBER_OF_EVENTS_PER_SUBSCRIPTION; i++) {
-                subscriber.onNext(
-                        SubscribeToShardEvent.builder()
-                                .records(createRecord(sequenceNumber))
-                                .continuationSequenceNumber(String.valueOf(i))
-                                .build());
-            }
-        }
     }
 
     private static class ExceptionalKinesisV2 extends 
KinesisProxyV2InterfaceAdapter {
@@ -253,8 +258,26 @@ public class FakeKinesisFanOutBehavioursFactory {
         }
 
         @Override
-        void sendEvents(Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
-            subscriber.onNext(event);
+        List<SubscribeToShardEvent> getEventsToSend() {
+            return Collections.singletonList(event);
+        }
+    }
+
+    private static class MultipleEventsForSingleRequest extends 
AbstractSingleShardFanOutKinesisV2 {
+
+        private MultipleEventsForSingleRequest() {
+            super(1);
+        }
+
+        @Override
+        List<SubscribeToShardEvent> getEventsToSend() {
+            return generateEvents(2, new AtomicInteger(1));
+        }
+
+        @Override
+        void completeSubscription(Subscriber<? super 
SubscribeToShardEventStream> subscriber) {
+            generateEvents(3, new 
AtomicInteger(2)).forEach(subscriber::onNext);
+            super.completeSubscription(subscriber);
         }
     }
 
@@ -287,7 +310,9 @@ public class FakeKinesisFanOutBehavioursFactory {
         }
 
         @Override
-        void sendEvents(final Subscriber<? super SubscribeToShardEventStream> 
subscriber) {
+        List<SubscribeToShardEvent> getEventsToSend() {
+            List<SubscribeToShardEvent> events = new ArrayList<>();
+
             SubscribeToShardEvent.Builder eventBuilder =
                     
SubscribeToShardEvent.builder().millisBehindLatest(millisBehindLatest);
 
@@ -316,8 +341,10 @@ public class FakeKinesisFanOutBehavioursFactory {
                                 : null;
                 eventBuilder.continuationSequenceNumber(continuation);
 
-                subscriber.onNext(eventBuilder.build());
+                events.add(eventBuilder.build());
             }
+
+            return events;
         }
 
         /** A convenience builder for {@link SingleShardFanOutKinesisV2}. */
@@ -407,31 +434,50 @@ public class FakeKinesisFanOutBehavioursFactory {
                     () -> {
                         responseHandler.responseReceived(
                                 SubscribeToShardResponse.builder().build());
-
                         responseHandler.onEventStream(
                                 subscriber -> {
-                                    
subscriber.onSubscribe(mock(Subscription.class));
+                                    final List<SubscribeToShardEvent> 
eventsToSend;
 
                                     if (remainingSubscriptions > 0) {
-                                        sendEvents(subscriber);
+                                        eventsToSend = getEventsToSend();
                                         remainingSubscriptions--;
                                     } else {
-                                        SubscribeToShardEvent.Builder 
eventBuilder =
-                                                SubscribeToShardEvent.builder()
-                                                        .millisBehindLatest(0L)
-                                                        
.continuationSequenceNumber(null);
-
-                                        
subscriber.onNext(eventBuilder.build());
+                                        eventsToSend =
+                                                Collections.singletonList(
+                                                        
SubscribeToShardEvent.builder()
+                                                                
.millisBehindLatest(0L)
+                                                                
.continuationSequenceNumber(null)
+                                                                .build());
                                     }
 
-                                    subscriber.onComplete();
-                                });
+                                    Subscription subscription = 
mock(Subscription.class);
+                                    Iterator<SubscribeToShardEvent> iterator =
+                                            eventsToSend.iterator();
+
+                                    doAnswer(
+                                                    a -> {
+                                                        if 
(!iterator.hasNext()) {
+                                                            
completeSubscription(subscriber);
+                                                        } else {
+                                                            
subscriber.onNext(iterator.next());
+                                                        }
 
+                                                        return null;
+                                                    })
+                                            .when(subscription)
+                                            .request(anyLong());
+
+                                    subscriber.onSubscribe(subscription);
+                                });
                         return null;
                     });
         }
 
-        abstract void sendEvents(final Subscriber<? super 
SubscribeToShardEventStream> subscriber);
+        void completeSubscription(Subscriber<? super 
SubscribeToShardEventStream> subscriber) {
+            subscriber.onComplete();
+        }
+
+        abstract List<SubscribeToShardEvent> getEventsToSend();
     }
 
     /** A fake Kinesis Proxy V2 that implements dummy logic for stream 
consumer related methods. */
@@ -628,4 +674,16 @@ public class FakeKinesisFanOutBehavioursFactory {
 
         return createRecord(recordAggregator.clearAndGet().toRecordBytes(), 
sequenceNumber);
     }
+
+    private static List<SubscribeToShardEvent> generateEvents(
+            int numberOfEvents, AtomicInteger sequenceNumber) {
+        return IntStream.range(0, numberOfEvents)
+                .mapToObj(
+                        i ->
+                                SubscribeToShardEvent.builder()
+                                        .records(createRecord(sequenceNumber))
+                                        
.continuationSequenceNumber(String.valueOf(i))
+                                        .build())
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
index 185817a..42cea41 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -51,11 +51,12 @@ import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigCons
 import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
-import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.LAZY;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.POLLING;
@@ -66,6 +67,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -257,7 +259,7 @@ public class AwsV2UtilTest {
         AwsV2Util.createHttpClient(clientConfiguration, builder, new 
Properties());
 
         verify(builder).build();
-        verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY);
+        
verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY);
         verify(builder).connectionTimeout(Duration.ofSeconds(10));
         verify(builder).writeTimeout(Duration.ofSeconds(50));
         verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1));
@@ -267,6 +269,31 @@ public class AwsV2UtilTest {
     }
 
     @Test
+    public void testCreateNettyHttpClientReadTimeout() {
+        Properties properties = new Properties();
+        properties.setProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS, "1234");
+
+        NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+        AwsV2Util.createHttpClient(
+                new ClientConfigurationFactory().getConfig(), builder, 
properties);
+
+        verify(builder).readTimeout(eq(Duration.ofMillis(1234)));
+    }
+
+    @Test
+    public void testCreateNettyHttpClientTcpKeepAlive() {
+        ClientConfiguration clientConfiguration = new 
ClientConfigurationFactory().getConfig();
+        clientConfiguration.setUseTcpKeepAlive(true);
+
+        NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+        AwsV2Util.createHttpClient(clientConfiguration, builder, new 
Properties());
+
+        verify(builder).tcpKeepAlive(true);
+    }
+
+    @Test
     public void testCreateNettyHttpClientConnectionTimeout() {
         ClientConfiguration clientConfiguration = new 
ClientConfigurationFactory().getConfig();
         clientConfiguration.setConnectionTimeout(1000);
@@ -419,6 +446,8 @@ public class AwsV2UtilTest {
         when(builder.connectionAcquisitionTimeout(any())).thenReturn(builder);
         when(builder.protocol(any())).thenReturn(builder);
         
when(builder.http2Configuration(any(Http2Configuration.class))).thenReturn(builder);
+        when(builder.tcpKeepAlive(anyBoolean())).thenReturn(builder);
+        when(builder.readTimeout(any())).thenReturn(builder);
 
         return builder;
     }

Reply via email to