This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 5cc9710 [FLINK-23802][kinesis][efo] Request next record from the Flink source thread rather than the AWS SDK response thread. Increase Read Timeout to 6m, higher than the maximum Shard subscription duration (5m) and enable TCP keep alive. 5cc9710 is described below commit 5cc97109f96a3aa908805d41ba3cee83168ee8e8 Author: Danny Cranmer <dannycran...@apache.org> AuthorDate: Mon Aug 16 09:37:55 2021 +0100 [FLINK-23802][kinesis][efo] Request next record from the Flink source thread rather than the AWS SDK response thread. Increase Read Timeout to 6m, higher than the maximum Shard subscription duration (5m) and enable TCP keep alive. --- .../kinesis/config/ConsumerConfigConstants.java | 7 +- .../publisher/fanout/FanOutShardSubscriber.java | 17 +-- .../kinesis/proxy/KinesisProxyV2Factory.java | 6 + .../connectors/kinesis/util/AwsV2Util.java | 14 ++- .../fanout/FanOutRecordPublisherTest.java | 18 ++- .../fanout/FanOutShardSubscriberTest.java | 2 +- .../kinesis/proxy/KinesisProxyV2FactoryTest.java | 88 +++++++++++++ .../FakeKinesisFanOutBehavioursFactory.java | 138 +++++++++++++++------ .../connectors/kinesis/util/AwsV2UtilTest.java | 33 ++++- 9 files changed, 260 insertions(+), 63 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 4ea0bd8..128c173 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -312,6 +312,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants { public static final String EFO_HTTP_CLIENT_MAX_CONCURRENCY = "flink.stream.efo.http-client.max-concurrency"; + public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS = + "flink.stream.efo.http-client.read-timeout"; + // ------------------------------------------------------------------------ // Default values for consumer configuration // ------------------------------------------------------------------------ @@ -403,7 +406,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants { public static final long DEFAULT_WATERMARK_SYNC_MILLIS = 30_000; - public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY = 10_000; + public static final int DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY = 10_000; + + public static final Duration DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT = Duration.ofMinutes(6); /** * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java index be4df59..afda248 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java @@ -68,14 +68,13 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer * </ul> * - * <p>The blocking queue has a maximum capacity of 1 record. This allows backpressure to be applied - * closer to the network stack and results in record prefetch. At maximum capacity we will have - * three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <p>The blocking queue has a maximum capacity of two. One slot is used for a record batch, the + * remaining slot is reserved to completion events. At maximum capacity we will have two {@link + * SubscribeToShardEvent} in memory (per instance of this class): * * <ul> * <li>1 event being processed by the consumer * <li>1 event enqueued in the blocking queue - * <li>1 event being added to the queue by the network (blocking) * </ul> */ @Internal @@ -86,9 +85,11 @@ public class FanOutShardSubscriber { /** * The maximum capacity of the queue between the network and consumption thread. The queue is * mainly used to isolate networking from consumption such that errors do not bubble up. This - * queue also acts as a buffer resulting in a record prefetch and reduced latency. + * queue also acts as a buffer resulting in a record prefetch and reduced latency. Capacity is 2 + * to allow 1 pending record batch and leave room for a completion event to avoid any writer + * thread blocking on the queue. */ - private static final int QUEUE_CAPACITY = 1; + private static final int QUEUE_CAPACITY = 2; /** * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected @@ -350,6 +351,9 @@ public class FanOutShardSubscriber { result = false; break; } else if (subscriptionEvent.isSubscribeToShardEvent()) { + // Request for KDS to send the next record batch + subscription.requestRecord(); + SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent(); continuationSequenceNumber = event.continuationSequenceNumber(); if (!event.records().isEmpty()) { @@ -412,7 +416,6 @@ public class FanOutShardSubscriber { @Override public void visit(SubscribeToShardEvent event) { enqueueEvent(new SubscriptionNextEvent(event)); - requestRecord(); } }); } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java index 9458b2c..dcab485 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java @@ -49,6 +49,8 @@ public class KinesisProxyV2Factory { final ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig(); + populateDefaultValues(clientConfiguration); + final SdkAsyncHttpClient httpClient = AwsV2Util.createHttpClient( clientConfiguration, NettyNioAsyncHttpClient.builder(), configProps); @@ -59,4 +61,8 @@ public class KinesisProxyV2Factory { return new KinesisProxyV2(client, httpClient, configuration, BACKOFF); } + + private static void populateDefaultValues(final ClientConfiguration clientConfiguration) { + clientConfiguration.setUseTcpKeepAlive(true); + } } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java index abb88d2..8d5621f 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java @@ -53,10 +53,12 @@ import java.time.Duration; import java.util.Optional; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_REGISTRATION_TYPE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO; @@ -100,11 +102,19 @@ public class AwsV2Util { int maxConcurrency = Optional.ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_MAX_CONCURRENCY)) .map(Integer::parseInt) - .orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY); + .orElse(DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY); + + Duration readTimeout = + Optional.ofNullable(consumerConfig.getProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS)) + .map(Integer::parseInt) + .map(Duration::ofMillis) + .orElse(DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT); httpClientBuilder .maxConcurrency(maxConcurrency) .connectionTimeout(Duration.ofMillis(config.getConnectionTimeout())) + .readTimeout(readTimeout) + .tcpKeepAlive(config.useTcpKeepAlive()) .writeTimeout(Duration.ofMillis(config.getSocketTimeout())) .connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis())) .useIdleConnectionReaper(config.useReaper()) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java index 45cba55..7ce16d3 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java @@ -61,7 +61,6 @@ import static org.apache.flink.streaming.connectors.kinesis.internals.publisher. import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM; import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM; -import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2.NUMBER_OF_SUBSCRIPTIONS; import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.emptyShard; import static org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.singletonShard; import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle; @@ -247,16 +246,15 @@ public class FanOutRecordPublisherTest { RecordPublisher recordPublisher = createRecordPublisher(kinesis); TestConsumer consumer = new TestConsumer(); - int count = 0; - while (recordPublisher.run(consumer) == INCOMPLETE) { - if (++count > NUMBER_OF_SUBSCRIPTIONS + 1) { - break; - } - } + RecordPublisherRunResult result = recordPublisher.run(consumer); + + // An exception is thrown after the 5th record in each subscription, therefore we expect to + // receive 5 records + assertEquals(5, consumer.getRecordBatches().size()); + assertEquals(1, kinesis.getNumberOfSubscribeToShardInvocations()); - // An exception is thrown on the 5th subscription and then the subscription completes on the - // next - assertEquals(NUMBER_OF_SUBSCRIPTIONS + 1, kinesis.getNumberOfSubscribeToShardInvocations()); + // INCOMPLETE is returned to indicate the shard is not complete + assertEquals(INCOMPLETE, result); } @Test diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java index c5962df..a41e702 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java @@ -141,7 +141,7 @@ public class FanOutShardSubscriberTest { thrown.expectMessage("Timed out enqueuing event SubscriptionNextEvent"); KinesisProxyV2Interface kinesis = - FakeKinesisFanOutBehavioursFactory.boundedShard().withBatchCount(5).build(); + FakeKinesisFanOutBehavioursFactory.shardThatCreatesBackpressureOnQueue(); FanOutShardSubscriber subscriber = new FanOutShardSubscriber( diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java new file mode 100644 index 0000000..6095b20 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2FactoryTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; + +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration; + +import java.lang.reflect.Field; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS; +import static org.junit.Assert.assertEquals; + +/** Test for methods in the {@link KinesisProxyV2Factory} class. */ +public class KinesisProxyV2FactoryTest { + + @Test + public void testReadTimeoutPopulatedFromDefaults() throws Exception { + Properties properties = properties(); + + KinesisProxyV2Interface proxy = KinesisProxyV2Factory.createKinesisProxyV2(properties); + NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy); + + assertEquals( + DEFAULT_EFO_HTTP_CLIENT_READ_TIMEOUT.toMillis(), + nettyConfiguration.readTimeoutMillis()); + } + + @Test + public void testReadTimeoutPopulatedFromProperties() throws Exception { + Properties properties = properties(); + properties.setProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS, "12345"); + + KinesisProxyV2Interface proxy = KinesisProxyV2Factory.createKinesisProxyV2(properties); + NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy); + + assertEquals(12345, nettyConfiguration.readTimeoutMillis()); + } + + @Test + public void testClientConfigurationPopulatedTcpKeepAliveDefaults() throws Exception { + Properties properties = properties(); + + KinesisProxyV2Interface proxy = KinesisProxyV2Factory.createKinesisProxyV2(properties); + NettyConfiguration nettyConfiguration = getNettyConfiguration(proxy); + + Assert.assertTrue(nettyConfiguration.tcpKeepAlive()); + } + + private NettyConfiguration getNettyConfiguration(final KinesisProxyV2Interface kinesis) + throws Exception { + NettyNioAsyncHttpClient httpClient = getField("httpClient", kinesis); + return getField("configuration", httpClient); + } + + private <T> T getField(String fieldName, Object obj) throws Exception { + Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(obj); + } + + private Properties properties() { + Properties properties = TestUtils.efoProperties(); + properties.setProperty(AWSConfigConstants.AWS_REGION, "eu-west-2"); + return properties; + } +} diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java index 1107efb..88da61c 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java @@ -43,15 +43,21 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHan import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE; import static software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING; @@ -100,6 +106,10 @@ public class FakeKinesisFanOutBehavioursFactory { return new FailsToAcquireSubscriptionKinesis(); } + public static AbstractSingleShardFanOutKinesisV2 shardThatCreatesBackpressureOnQueue() { + return new MultipleEventsForSingleRequest(); + } + // ------------------------------------------------------------------------ // Behaviours related to describing streams // ------------------------------------------------------------------------ @@ -140,20 +150,21 @@ public class FakeKinesisFanOutBehavioursFactory { public static AbstractSingleShardFanOutKinesisV2 emptyBatchFollowedBySingleRecord() { return new AbstractSingleShardFanOutKinesisV2(2) { - private int subscription = 0; + private int subscriptionCount = 0; @Override - void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) { + List<SubscribeToShardEvent> getEventsToSend() { SubscribeToShardEvent.Builder builder = SubscribeToShardEvent.builder() - .continuationSequenceNumber(subscription == 0 ? "1" : null); + .continuationSequenceNumber(subscriptionCount == 0 ? "1" : null); - if (subscription == 1) { + if (subscriptionCount == 1) { builder.records(createRecord(new AtomicInteger(1))); } - subscriber.onNext(builder.build()); - subscription++; + subscriptionCount++; + + return Collections.singletonList(builder.build()); } }; } @@ -171,15 +182,14 @@ public class FakeKinesisFanOutBehavioursFactory { } @Override - void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) { - if (index % 2 == 0) { - super.sendEvents(subscriber); + void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) { + if (index++ % 2 == 0) { + // Fail the subscription + super.completeSubscription(subscriber); } else { - super.sendEventBatch(subscriber); + // Do not fail the subscription subscriber.onComplete(); } - - index++; } } @@ -203,28 +213,23 @@ public class FakeKinesisFanOutBehavioursFactory { } @Override - void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) { - sendEventBatch(subscriber); + List<SubscribeToShardEvent> getEventsToSend() { + return generateEvents(NUMBER_OF_EVENTS_PER_SUBSCRIPTION, sequenceNumber); + } + + @Override + void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) { try { // Add an artificial delay to allow records to flush Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } + for (Throwable throwable : throwables) { subscriber.onError(throwable); } } - - void sendEventBatch(Subscriber<? super SubscribeToShardEventStream> subscriber) { - for (int i = 0; i < NUMBER_OF_EVENTS_PER_SUBSCRIPTION; i++) { - subscriber.onNext( - SubscribeToShardEvent.builder() - .records(createRecord(sequenceNumber)) - .continuationSequenceNumber(String.valueOf(i)) - .build()); - } - } } private static class ExceptionalKinesisV2 extends KinesisProxyV2InterfaceAdapter { @@ -253,8 +258,26 @@ public class FakeKinesisFanOutBehavioursFactory { } @Override - void sendEvents(Subscriber<? super SubscribeToShardEventStream> subscriber) { - subscriber.onNext(event); + List<SubscribeToShardEvent> getEventsToSend() { + return Collections.singletonList(event); + } + } + + private static class MultipleEventsForSingleRequest extends AbstractSingleShardFanOutKinesisV2 { + + private MultipleEventsForSingleRequest() { + super(1); + } + + @Override + List<SubscribeToShardEvent> getEventsToSend() { + return generateEvents(2, new AtomicInteger(1)); + } + + @Override + void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) { + generateEvents(3, new AtomicInteger(2)).forEach(subscriber::onNext); + super.completeSubscription(subscriber); } } @@ -287,7 +310,9 @@ public class FakeKinesisFanOutBehavioursFactory { } @Override - void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber) { + List<SubscribeToShardEvent> getEventsToSend() { + List<SubscribeToShardEvent> events = new ArrayList<>(); + SubscribeToShardEvent.Builder eventBuilder = SubscribeToShardEvent.builder().millisBehindLatest(millisBehindLatest); @@ -316,8 +341,10 @@ public class FakeKinesisFanOutBehavioursFactory { : null; eventBuilder.continuationSequenceNumber(continuation); - subscriber.onNext(eventBuilder.build()); + events.add(eventBuilder.build()); } + + return events; } /** A convenience builder for {@link SingleShardFanOutKinesisV2}. */ @@ -407,31 +434,50 @@ public class FakeKinesisFanOutBehavioursFactory { () -> { responseHandler.responseReceived( SubscribeToShardResponse.builder().build()); - responseHandler.onEventStream( subscriber -> { - subscriber.onSubscribe(mock(Subscription.class)); + final List<SubscribeToShardEvent> eventsToSend; if (remainingSubscriptions > 0) { - sendEvents(subscriber); + eventsToSend = getEventsToSend(); remainingSubscriptions--; } else { - SubscribeToShardEvent.Builder eventBuilder = - SubscribeToShardEvent.builder() - .millisBehindLatest(0L) - .continuationSequenceNumber(null); - - subscriber.onNext(eventBuilder.build()); + eventsToSend = + Collections.singletonList( + SubscribeToShardEvent.builder() + .millisBehindLatest(0L) + .continuationSequenceNumber(null) + .build()); } - subscriber.onComplete(); - }); + Subscription subscription = mock(Subscription.class); + Iterator<SubscribeToShardEvent> iterator = + eventsToSend.iterator(); + + doAnswer( + a -> { + if (!iterator.hasNext()) { + completeSubscription(subscriber); + } else { + subscriber.onNext(iterator.next()); + } + return null; + }) + .when(subscription) + .request(anyLong()); + + subscriber.onSubscribe(subscription); + }); return null; }); } - abstract void sendEvents(final Subscriber<? super SubscribeToShardEventStream> subscriber); + void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) { + subscriber.onComplete(); + } + + abstract List<SubscribeToShardEvent> getEventsToSend(); } /** A fake Kinesis Proxy V2 that implements dummy logic for stream consumer related methods. */ @@ -628,4 +674,16 @@ public class FakeKinesisFanOutBehavioursFactory { return createRecord(recordAggregator.clearAndGet().toRecordBytes(), sequenceNumber); } + + private static List<SubscribeToShardEvent> generateEvents( + int numberOfEvents, AtomicInteger sequenceNumber) { + return IntStream.range(0, numberOfEvents) + .mapToObj( + i -> + SubscribeToShardEvent.builder() + .records(createRecord(sequenceNumber)) + .continuationSequenceNumber(String.valueOf(i)) + .build()) + .collect(Collectors.toList()); + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java index 185817a..42cea41 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java @@ -51,11 +51,12 @@ import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigCons import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn; import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName; import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile; -import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.EAGER; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.LAZY; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_MAX_CONCURRENCY; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.EFO; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType.POLLING; @@ -66,6 +67,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -257,7 +259,7 @@ public class AwsV2UtilTest { AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties()); verify(builder).build(); - verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONURRENCY); + verify(builder).maxConcurrency(DEFAULT_EFO_HTTP_CLIENT_MAX_CONCURRENCY); verify(builder).connectionTimeout(Duration.ofSeconds(10)); verify(builder).writeTimeout(Duration.ofSeconds(50)); verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1)); @@ -267,6 +269,31 @@ public class AwsV2UtilTest { } @Test + public void testCreateNettyHttpClientReadTimeout() { + Properties properties = new Properties(); + properties.setProperty(EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS, "1234"); + + NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder(); + + AwsV2Util.createHttpClient( + new ClientConfigurationFactory().getConfig(), builder, properties); + + verify(builder).readTimeout(eq(Duration.ofMillis(1234))); + } + + @Test + public void testCreateNettyHttpClientTcpKeepAlive() { + ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig(); + clientConfiguration.setUseTcpKeepAlive(true); + + NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder(); + + AwsV2Util.createHttpClient(clientConfiguration, builder, new Properties()); + + verify(builder).tcpKeepAlive(true); + } + + @Test public void testCreateNettyHttpClientConnectionTimeout() { ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig(); clientConfiguration.setConnectionTimeout(1000); @@ -419,6 +446,8 @@ public class AwsV2UtilTest { when(builder.connectionAcquisitionTimeout(any())).thenReturn(builder); when(builder.protocol(any())).thenReturn(builder); when(builder.http2Configuration(any(Http2Configuration.class))).thenReturn(builder); + when(builder.tcpKeepAlive(anyBoolean())).thenReturn(builder); + when(builder.readTimeout(any())).thenReturn(builder); return builder; }