This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch FLINK-21661 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6af2bd8020cd952ca1bc6e2942ddb5af72a7eb2 Author: Danny Cranmer <dannycran...@apache.org> AuthorDate: Thu Mar 25 08:33:22 2021 +0000 [FLINK-21933][kinesis] EFO consumer treats interrupts as retryable exceptions (#15347) --- .../kinesis/internals/KinesisDataFetcher.java | 15 ++++++ .../kinesis/internals/ShardConsumer.java | 34 ++++++++++-- .../internals/publisher/RecordPublisher.java | 5 +- .../publisher/fanout/FanOutRecordPublisher.java | 8 +++ .../publisher/fanout/FanOutShardSubscriber.java | 29 +++++++++- .../kinesis/internals/KinesisDataFetcherTest.java | 63 +++++++++++++++------- .../kinesis/internals/ShardConsumerFanOutTest.java | 48 +++++++++++++++++ .../kinesis/internals/ShardConsumerTestUtils.java | 17 +++++- .../fanout/FanOutRecordPublisherTest.java | 20 ++++++- .../fanout/FanOutShardSubscriberTest.java | 17 ++++++ 10 files changed, 226 insertions(+), 30 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index be4b1f9..163cd04 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -800,6 +800,12 @@ public class KinesisDataFetcher<T> { * executed and all shard consuming threads will be interrupted. */ public void shutdownFetcher() { + if (LOG.isInfoEnabled()) { + LOG.info( + "Starting shutdown of shard consumer threads and AWS SDK resources of subtask {} ...", + indexOfThisConsumerSubtask); + } + running = false; StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams); @@ -825,6 +831,15 @@ public class KinesisDataFetcher<T> { } /** + * Returns a flag indicating if this fetcher is running. + * + * @return true if the fetch is running, false if it has been shutdown + */ + boolean isRunning() { + return running; + } + + /** * After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the * fetcher shutdown. */ diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index f3abd54..31af3f1 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import static java.util.Optional.ofNullable; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED; import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -148,6 +149,11 @@ public class ShardConsumer<T> implements Runnable { // we can close this consumer thread once we've reached the end of the // subscribed shard break; + } else if (result == CANCELLED) { + final String errorMessage = + "Shard consumer cancelled: " + subscribedShard.getShard().getShardId(); + LOG.info(errorMessage); + throw new ShardConsumerCancelledException(errorMessage); } } } catch (Throwable t) { @@ -157,13 +163,15 @@ public class ShardConsumer<T> implements Runnable { /** * The loop in run() checks this before fetching next batch of records. Since this runnable will - * be executed by the ExecutorService {@code KinesisDataFetcher#shardConsumersExecutor}, the - * only way to close down this thread would be by calling shutdownNow() on {@code + * be executed by the ExecutorService {@code KinesisDataFetcher#shardConsumersExecutor}, this + * thread would be closed down by calling shutdownNow() on {@code * KinesisDataFetcher#shardConsumersExecutor} and let the executor service interrupt all - * currently running {@link ShardConsumer}s. + * currently running {@link ShardConsumer}s. The AWS SDK resources must be shutdown prior to + * this thread in order to preserve classpath for teardown, therefore also check to see if the + * fetcher is still running. */ private boolean isRunning() { - return !Thread.interrupted(); + return !Thread.interrupted() && fetcherRef.isRunning(); } /** @@ -227,4 +235,22 @@ public class ShardConsumer<T> implements Runnable { return !record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber()) || record.getSubSequenceNumber() > lastSequenceNum.getSubSequenceNumber(); } + + /** An exception wrapper to indicate an error has been thrown from the shard consumer. */ + abstract static class ShardConsumerException extends RuntimeException { + private static final long serialVersionUID = 7732343624482321663L; + + public ShardConsumerException(final String message) { + super(message); + } + } + + /** An exception to indicate the shard consumer has been cancelled. */ + static class ShardConsumerCancelledException extends ShardConsumerException { + private static final long serialVersionUID = 2707399313569728649L; + + public ShardConsumerCancelledException(final String message) { + super(message); + } + } } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java index 8c70cbd..9da8794 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java @@ -45,7 +45,10 @@ public interface RecordPublisher { COMPLETE, /** There are more records to consume from this shard. */ - INCOMPLETE + INCOMPLETE, + + /** The record publisher has been cancelled. */ + CANCELLED } /** diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java index 2d619ea..5104f8c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberInterruptedException; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.RecoverableFanOutSubscriberException; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition; @@ -43,6 +44,7 @@ import java.util.Date; import java.util.List; import java.util.function.Consumer; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED; import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; import static software.amazon.awssdk.services.kinesis.model.StartingPosition.builder; @@ -149,6 +151,12 @@ public class FanOutRecordPublisher implements RecordPublisher { fanOutShardSubscriber.subscribeToShardAndConsumeRecords( toSdkV2StartingPosition(nextStartingPosition), eventConsumer); attempt = 0; + } catch (FanOutSubscriberInterruptedException ex) { + LOG.info( + "Thread interrupted, closing record publisher for shard {}.", + subscribedShard.getShard().getShardId(), + ex); + return CANCELLED; } catch (RecoverableFanOutSubscriberException ex) { // Recoverable errors should be reattempted without contributing to the retry policy // A recoverable error would not result in the Flink job being cancelled diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java index 252ab5e..e06923f 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java @@ -232,7 +232,9 @@ public class FanOutShardSubscriber { consumerArn, cause); - if (cause instanceof ReadTimeoutException) { + if (isInterrupted(throwable)) { + throw new FanOutSubscriberInterruptedException(throwable); + } else if (cause instanceof ReadTimeoutException) { // ReadTimeoutException occurs naturally under backpressure scenarios when full batches // take longer to // process than standard read timeout (default 30s). Recoverable exceptions are intended @@ -246,6 +248,19 @@ public class FanOutShardSubscriber { } } + private boolean isInterrupted(final Throwable throwable) { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof InterruptedException) { + return true; + } + + cause = cause.getCause(); + } + + return false; + } + /** * Once the subscription is open, records will be delivered to the {@link BlockingQueue}. Queue * capacity is hardcoded to 1 record, the queue is used solely to separate consumption and @@ -277,7 +292,7 @@ public class FanOutShardSubscriber { } if (subscriptionEvent == null) { - LOG.debug( + LOG.info( "Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn); @@ -438,6 +453,16 @@ public class FanOutShardSubscriber { } } + /** An exception wrapper to indicate the subscriber has been interrupted. */ + static class FanOutSubscriberInterruptedException extends FanOutSubscriberException { + + private static final long serialVersionUID = -2783477408630427189L; + + public FanOutSubscriberInterruptedException(Throwable cause) { + super(cause); + } + } + /** * An interface used to pass messages between {@link FanOutShardSubscription} and {@link * FanOutShardSubscriber} via the {@link BlockingQueue}. diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index 48c7ef2..2cd5f33 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import org.powermock.reflect.Whitebox; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -68,6 +69,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE; @@ -86,28 +88,28 @@ import static org.mockito.Mockito.when; /** Tests for the {@link KinesisDataFetcher}. */ public class KinesisDataFetcherTest extends TestLogger { - @Test(expected = RuntimeException.class) - public void testIfNoShardsAreFoundShouldThrowException() throws Exception { - List<String> fakeStreams = new LinkedList<>(); - fakeStreams.add("fakeStream1"); - fakeStreams.add("fakeStream2"); + @Test + public void testIsRunning() { + KinesisDataFetcher<String> fetcher = + createTestDataFetcherWithNoShards(10, 2, "test-stream"); - HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = - KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState( - fakeStreams); + assertTrue(fetcher.isRunning()); + } - TestableKinesisDataFetcher<String> fetcher = - new TestableKinesisDataFetcher<>( - fakeStreams, - new TestSourceContext<>(), - TestUtils.getStandardProperties(), - new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), - 10, - 2, - new AtomicReference<>(), - new LinkedList<>(), - subscribedStreamsToLastSeenShardIdsUnderTest, - FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour()); + @Test + public void testIsRunningFalseAfterShutDown() { + KinesisDataFetcher<String> fetcher = + createTestDataFetcherWithNoShards(10, 2, "test-stream"); + + fetcher.shutdownFetcher(); + + assertFalse(fetcher.isRunning()); + } + + @Test(expected = RuntimeException.class) + public void testIfNoShardsAreFoundShouldThrowException() throws Exception { + KinesisDataFetcher<String> fetcher = + createTestDataFetcherWithNoShards(10, 2, "fakeStream1", "fakeStream2"); fetcher.runFetcher(); // this should throw RuntimeException } @@ -1014,4 +1016,25 @@ public class KinesisDataFetcherTest extends TestLogger { verify(kinesisV2).close(); } + + private KinesisDataFetcher<String> createTestDataFetcherWithNoShards( + final int subtaskCount, final int subtaskIndex, final String... streams) { + List<String> streamList = Arrays.stream(streams).collect(Collectors.toList()); + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState( + streamList); + + return new TestableKinesisDataFetcher<String>( + streamList, + new TestSourceContext<>(), + TestUtils.getStandardProperties(), + new KinesisDeserializationSchemaWrapper<String>(new SimpleStringSchema()), + subtaskCount, + subtaskIndex, + new AtomicReference<>(), + new LinkedList<>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour()); + } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java index 5e4328d..04a6692 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java @@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOut import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2; import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2; +import com.amazonaws.SdkClientException; +import com.amazonaws.http.timers.client.SdkInterruptedException; import org.junit.Test; import software.amazon.awssdk.services.kinesis.model.StartingPosition; @@ -34,6 +36,7 @@ import java.util.Properties; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX; import static org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber; import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM; import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM; @@ -219,6 +222,51 @@ public class ShardConsumerFanOutTest { kinesis.getStartingPositionForSubscription(4), "8"); } + @Test + public void testShardConsumerExitsWhenRecordPublisherIsInterrupted() throws Exception { + // Throws error after 5 records + KinesisProxyV2Interface kinesis = + FakeKinesisFanOutBehavioursFactory.errorDuringSubscription( + new SdkInterruptedException(null)); + + int expectedNumberOfRecordsReadFromKinesisBeforeError = 5; + SequenceNumber startingSequenceNumber = new SequenceNumber("0"); + SequenceNumber expectedLastProcessSequenceNumber = new SequenceNumber("5"); + + // SdkInterruptedException will terminate the consumer, it will not retry and read only the + // first 5 records + ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis( + expectedNumberOfRecordsReadFromKinesisBeforeError, + new FanOutRecordPublisherFactory(kinesis), + startingSequenceNumber, + efoProperties(), + expectedLastProcessSequenceNumber); + } + + @Test + public void testShardConsumerRetriesGenericSdkError() throws Exception { + // Throws error after 5 records and there are 25 records available in the shard + KinesisProxyV2Interface kinesis = + FakeKinesisFanOutBehavioursFactory.errorDuringSubscription( + new SdkClientException("")); + + int expectedNumberOfRecordsReadFromKinesisBeforeError = 25; + SequenceNumber startingSequenceNumber = new SequenceNumber("0"); + + Properties properties = efoProperties(); + // Speed up test by reducing backoff time + properties.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_MAX, "1"); + + // SdkClientException will cause a retry, each retry will result in 5 more records being + // consumed + // The shard will consume all 25 records + assertNumberOfMessagesReceivedFromKinesis( + expectedNumberOfRecordsReadFromKinesisBeforeError, + kinesis, + startingSequenceNumber, + properties); + } + private void assertStartingPositionAfterSequenceNumber( final StartingPosition startingPosition, final String sequenceNumber) { assertEquals(AFTER_SEQUENCE_NUMBER, startingPosition.type()); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java index fe3f058..c34b60e 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java @@ -58,6 +58,21 @@ public class ShardConsumerTestUtils { final SequenceNumber startingSequenceNumber, final Properties consumerProperties) throws InterruptedException { + return assertNumberOfMessagesReceivedFromKinesis( + expectedNumberOfMessages, + recordPublisherFactory, + startingSequenceNumber, + consumerProperties, + SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); + } + + public static <T> ShardConsumerMetricsReporter assertNumberOfMessagesReceivedFromKinesis( + final int expectedNumberOfMessages, + final RecordPublisherFactory recordPublisherFactory, + final SequenceNumber startingSequenceNumber, + final Properties consumerProperties, + final SequenceNumber expectedLastProcessedSequenceNum) + throws InterruptedException { ShardConsumerMetricsReporter shardMetricsReporter = new ShardConsumerMetricsReporter(mock(MetricGroup.class)); @@ -118,7 +133,7 @@ public class ShardConsumerTestUtils { assertEquals(expectedNumberOfMessages, sourceContext.getCollectedOutputs().size()); assertEquals( - SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(), + expectedLastProcessedSequenceNum, subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum()); return shardMetricsReporter; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java index 07fec0d..45cba55 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition; import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff; @@ -28,6 +29,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOut import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2; import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer; +import com.amazonaws.http.timers.client.SdkInterruptedException; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import io.netty.handler.timeout.ReadTimeoutException; import org.hamcrest.Matchers; @@ -54,6 +56,7 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED; import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM; @@ -342,8 +345,7 @@ public class FanOutRecordPublisherTest { backoff); int count = 0; - while (recordPublisher.run(new TestConsumer()) - == RecordPublisher.RecordPublisherRunResult.INCOMPLETE) { + while (recordPublisher.run(new TestConsumer()) == RecordPublisherRunResult.INCOMPLETE) { if (++count > EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES) { break; } @@ -488,6 +490,20 @@ public class FanOutRecordPublisherTest { } } + @Test + public void testInterruptedPublisherReturnsCancelled() throws Exception { + KinesisProxyV2Interface kinesis = + FakeKinesisFanOutBehavioursFactory.errorDuringSubscription( + new SdkInterruptedException(null)); + + RecordPublisher publisher = + createRecordPublisher( + kinesis, StartingPosition.continueFromSequenceNumber(SEQUENCE_NUMBER)); + RecordPublisherRunResult actual = publisher.run(new TestConsumer()); + + assertEquals(CANCELLED, actual); + } + private List<UserRecord> flattenToUserRecords(final List<RecordBatch> recordBatch) { return recordBatch.stream() .flatMap(b -> b.getDeaggregatedRecords().stream()) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java index 3c6846e..0061cdc 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory; import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2; +import com.amazonaws.http.timers.client.SdkInterruptedException; import io.netty.handler.timeout.ReadTimeoutException; import org.junit.Rule; import org.junit.Test; @@ -66,6 +67,22 @@ public class FanOutShardSubscriberTest { } @Test + public void testInterruptedErrorThrownToConsumer() throws Exception { + thrown.expect(FanOutShardSubscriber.FanOutSubscriberInterruptedException.class); + + SdkInterruptedException error = new SdkInterruptedException(null); + SubscriptionErrorKinesisV2 errorKinesisV2 = + FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error); + + FanOutShardSubscriber subscriber = + new FanOutShardSubscriber("consumerArn", "shardId", errorKinesisV2); + + software.amazon.awssdk.services.kinesis.model.StartingPosition startingPosition = + software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build(); + subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {}); + } + + @Test public void testMultipleErrorsThrownPassesFirstErrorToConsumer() throws Exception { thrown.expect(FanOutShardSubscriber.FanOutSubscriberException.class); thrown.expectMessage("Error 1!");