This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch FLINK-21661
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6af2bd8020cd952ca1bc6e2942ddb5af72a7eb2
Author: Danny Cranmer <dannycran...@apache.org>
AuthorDate: Thu Mar 25 08:33:22 2021 +0000

    [FLINK-21933][kinesis] EFO consumer treats interrupts as retryable 
exceptions (#15347)
---
 .../kinesis/internals/KinesisDataFetcher.java      | 15 ++++++
 .../kinesis/internals/ShardConsumer.java           | 34 ++++++++++--
 .../internals/publisher/RecordPublisher.java       |  5 +-
 .../publisher/fanout/FanOutRecordPublisher.java    |  8 +++
 .../publisher/fanout/FanOutShardSubscriber.java    | 29 +++++++++-
 .../kinesis/internals/KinesisDataFetcherTest.java  | 63 +++++++++++++++-------
 .../kinesis/internals/ShardConsumerFanOutTest.java | 48 +++++++++++++++++
 .../kinesis/internals/ShardConsumerTestUtils.java  | 17 +++++-
 .../fanout/FanOutRecordPublisherTest.java          | 20 ++++++-
 .../fanout/FanOutShardSubscriberTest.java          | 17 ++++++
 10 files changed, 226 insertions(+), 30 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index be4b1f9..163cd04 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -800,6 +800,12 @@ public class KinesisDataFetcher<T> {
      * executed and all shard consuming threads will be interrupted.
      */
     public void shutdownFetcher() {
+        if (LOG.isInfoEnabled()) {
+            LOG.info(
+                    "Starting shutdown of shard consumer threads and AWS SDK 
resources of subtask {} ...",
+                    indexOfThisConsumerSubtask);
+        }
+
         running = false;
 
         StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, 
streams);
@@ -825,6 +831,15 @@ public class KinesisDataFetcher<T> {
     }
 
     /**
+     * Returns a flag indicating if this fetcher is running.
+     *
+     * @return true if the fetch is running, false if it has been shutdown
+     */
+    boolean isRunning() {
+        return running;
+    }
+
+    /**
      * After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be 
called to await the
      * fetcher shutdown.
      */
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index f3abd54..31af3f1 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static java.util.Optional.ofNullable;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -148,6 +149,11 @@ public class ShardConsumer<T> implements Runnable {
                     // we can close this consumer thread once we've reached 
the end of the
                     // subscribed shard
                     break;
+                } else if (result == CANCELLED) {
+                    final String errorMessage =
+                            "Shard consumer cancelled: " + 
subscribedShard.getShard().getShardId();
+                    LOG.info(errorMessage);
+                    throw new ShardConsumerCancelledException(errorMessage);
                 }
             }
         } catch (Throwable t) {
@@ -157,13 +163,15 @@ public class ShardConsumer<T> implements Runnable {
 
     /**
      * The loop in run() checks this before fetching next batch of records. 
Since this runnable will
-     * be executed by the ExecutorService {@code 
KinesisDataFetcher#shardConsumersExecutor}, the
-     * only way to close down this thread would be by calling shutdownNow() on 
{@code
+     * be executed by the ExecutorService {@code 
KinesisDataFetcher#shardConsumersExecutor}, this
+     * thread would be closed down by calling shutdownNow() on {@code
      * KinesisDataFetcher#shardConsumersExecutor} and let the executor service 
interrupt all
-     * currently running {@link ShardConsumer}s.
+     * currently running {@link ShardConsumer}s. The AWS SDK resources must be 
shutdown prior to
+     * this thread in order to preserve classpath for teardown, therefore also 
check to see if the
+     * fetcher is still running.
      */
     private boolean isRunning() {
-        return !Thread.interrupted();
+        return !Thread.interrupted() && fetcherRef.isRunning();
     }
 
     /**
@@ -227,4 +235,22 @@ public class ShardConsumer<T> implements Runnable {
         return 
!record.getSequenceNumber().equals(lastSequenceNum.getSequenceNumber())
                 || record.getSubSequenceNumber() > 
lastSequenceNum.getSubSequenceNumber();
     }
+
+    /** An exception wrapper to indicate an error has been thrown from the 
shard consumer. */
+    abstract static class ShardConsumerException extends RuntimeException {
+        private static final long serialVersionUID = 7732343624482321663L;
+
+        public ShardConsumerException(final String message) {
+            super(message);
+        }
+    }
+
+    /** An exception to indicate the shard consumer has been cancelled. */
+    static class ShardConsumerCancelledException extends 
ShardConsumerException {
+        private static final long serialVersionUID = 2707399313569728649L;
+
+        public ShardConsumerCancelledException(final String message) {
+            super(message);
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
index 8c70cbd..9da8794 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisher.java
@@ -45,7 +45,10 @@ public interface RecordPublisher {
         COMPLETE,
 
         /** There are more records to consume from this shard. */
-        INCOMPLETE
+        INCOMPLETE,
+
+        /** The record publisher has been cancelled. */
+        CANCELLED
     }
 
     /**
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
index 2d619ea..5104f8c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberInterruptedException;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.RecoverableFanOutSubscriberException;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
@@ -43,6 +44,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 import static 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder;
@@ -149,6 +151,12 @@ public class FanOutRecordPublisher implements 
RecordPublisher {
                     fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
                             toSdkV2StartingPosition(nextStartingPosition), 
eventConsumer);
             attempt = 0;
+        } catch (FanOutSubscriberInterruptedException ex) {
+            LOG.info(
+                    "Thread interrupted, closing record publisher for shard 
{}.",
+                    subscribedShard.getShard().getShardId(),
+                    ex);
+            return CANCELLED;
         } catch (RecoverableFanOutSubscriberException ex) {
             // Recoverable errors should be reattempted without contributing 
to the retry policy
             // A recoverable error would not result in the Flink job being 
cancelled
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index 252ab5e..e06923f 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -232,7 +232,9 @@ public class FanOutShardSubscriber {
                 consumerArn,
                 cause);
 
-        if (cause instanceof ReadTimeoutException) {
+        if (isInterrupted(throwable)) {
+            throw new FanOutSubscriberInterruptedException(throwable);
+        } else if (cause instanceof ReadTimeoutException) {
             // ReadTimeoutException occurs naturally under backpressure 
scenarios when full batches
             // take longer to
             // process than standard read timeout (default 30s). Recoverable 
exceptions are intended
@@ -246,6 +248,19 @@ public class FanOutShardSubscriber {
         }
     }
 
+    private boolean isInterrupted(final Throwable throwable) {
+        Throwable cause = throwable;
+        while (cause != null) {
+            if (cause instanceof InterruptedException) {
+                return true;
+            }
+
+            cause = cause.getCause();
+        }
+
+        return false;
+    }
+
     /**
      * Once the subscription is open, records will be delivered to the {@link 
BlockingQueue}. Queue
      * capacity is hardcoded to 1 record, the queue is used solely to separate 
consumption and
@@ -277,7 +292,7 @@ public class FanOutShardSubscriber {
             }
 
             if (subscriptionEvent == null) {
-                LOG.debug(
+                LOG.info(
                         "Timed out polling events from network, reacquiring 
subscription - {} ({})",
                         shardId,
                         consumerArn);
@@ -438,6 +453,16 @@ public class FanOutShardSubscriber {
         }
     }
 
+    /** An exception wrapper to indicate the subscriber has been interrupted. 
*/
+    static class FanOutSubscriberInterruptedException extends 
FanOutSubscriberException {
+
+        private static final long serialVersionUID = -2783477408630427189L;
+
+        public FanOutSubscriberInterruptedException(Throwable cause) {
+            super(cause);
+        }
+    }
+
     /**
      * An interface used to pass messages between {@link 
FanOutShardSubscription} and {@link
      * FanOutShardSubscriber} via the {@link BlockingQueue}.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 48c7ef2..2cd5f33 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -68,6 +69,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType.NONE;
@@ -86,28 +88,28 @@ import static org.mockito.Mockito.when;
 /** Tests for the {@link KinesisDataFetcher}. */
 public class KinesisDataFetcherTest extends TestLogger {
 
-    @Test(expected = RuntimeException.class)
-    public void testIfNoShardsAreFoundShouldThrowException() throws Exception {
-        List<String> fakeStreams = new LinkedList<>();
-        fakeStreams.add("fakeStream1");
-        fakeStreams.add("fakeStream2");
+    @Test
+    public void testIsRunning() {
+        KinesisDataFetcher<String> fetcher =
+                createTestDataFetcherWithNoShards(10, 2, "test-stream");
 
-        HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
-                
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(
-                        fakeStreams);
+        assertTrue(fetcher.isRunning());
+    }
 
-        TestableKinesisDataFetcher<String> fetcher =
-                new TestableKinesisDataFetcher<>(
-                        fakeStreams,
-                        new TestSourceContext<>(),
-                        TestUtils.getStandardProperties(),
-                        new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
-                        10,
-                        2,
-                        new AtomicReference<>(),
-                        new LinkedList<>(),
-                        subscribedStreamsToLastSeenShardIdsUnderTest,
-                        
FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour());
+    @Test
+    public void testIsRunningFalseAfterShutDown() {
+        KinesisDataFetcher<String> fetcher =
+                createTestDataFetcherWithNoShards(10, 2, "test-stream");
+
+        fetcher.shutdownFetcher();
+
+        assertFalse(fetcher.isRunning());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testIfNoShardsAreFoundShouldThrowException() throws Exception {
+        KinesisDataFetcher<String> fetcher =
+                createTestDataFetcherWithNoShards(10, 2, "fakeStream1", 
"fakeStream2");
 
         fetcher.runFetcher(); // this should throw RuntimeException
     }
@@ -1014,4 +1016,25 @@ public class KinesisDataFetcherTest extends TestLogger {
 
         verify(kinesisV2).close();
     }
+
+    private KinesisDataFetcher<String> createTestDataFetcherWithNoShards(
+            final int subtaskCount, final int subtaskIndex, final String... 
streams) {
+        List<String> streamList = 
Arrays.stream(streams).collect(Collectors.toList());
+
+        HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+                
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(
+                        streamList);
+
+        return new TestableKinesisDataFetcher<String>(
+                streamList,
+                new TestSourceContext<>(),
+                TestUtils.getStandardProperties(),
+                new KinesisDeserializationSchemaWrapper<String>(new 
SimpleStringSchema()),
+                subtaskCount,
+                subtaskIndex,
+                new AtomicReference<>(),
+                new LinkedList<>(),
+                subscribedStreamsToLastSeenShardIdsUnderTest,
+                
FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
index 5e4328d..04a6692 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOut
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SingleShardFanOutKinesisV2;
 
+import com.amazonaws.SdkClientException;
+import com.amazonaws.http.timers.client.SdkInterruptedException;
 import org.junit.Test;
 import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 
@@ -34,6 +36,7 @@ import java.util.Properties;
 
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerTestUtils.fakeSequenceNumber;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM;
@@ -219,6 +222,51 @@ public class ShardConsumerFanOutTest {
                 kinesis.getStartingPositionForSubscription(4), "8");
     }
 
+    @Test
+    public void testShardConsumerExitsWhenRecordPublisherIsInterrupted() 
throws Exception {
+        // Throws error after 5 records
+        KinesisProxyV2Interface kinesis =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new SdkInterruptedException(null));
+
+        int expectedNumberOfRecordsReadFromKinesisBeforeError = 5;
+        SequenceNumber startingSequenceNumber = new SequenceNumber("0");
+        SequenceNumber expectedLastProcessSequenceNumber = new 
SequenceNumber("5");
+
+        // SdkInterruptedException will terminate the consumer, it will not 
retry and read only the
+        // first 5 records
+        ShardConsumerTestUtils.assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfRecordsReadFromKinesisBeforeError,
+                new FanOutRecordPublisherFactory(kinesis),
+                startingSequenceNumber,
+                efoProperties(),
+                expectedLastProcessSequenceNumber);
+    }
+
+    @Test
+    public void testShardConsumerRetriesGenericSdkError() throws Exception {
+        // Throws error after 5 records and there are 25 records available in 
the shard
+        KinesisProxyV2Interface kinesis =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new SdkClientException(""));
+
+        int expectedNumberOfRecordsReadFromKinesisBeforeError = 25;
+        SequenceNumber startingSequenceNumber = new SequenceNumber("0");
+
+        Properties properties = efoProperties();
+        // Speed up test by reducing backoff time
+        properties.setProperty(SUBSCRIBE_TO_SHARD_BACKOFF_MAX, "1");
+
+        // SdkClientException will cause a retry, each retry will result in 5 
more records being
+        // consumed
+        // The shard will consume all 25 records
+        assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfRecordsReadFromKinesisBeforeError,
+                kinesis,
+                startingSequenceNumber,
+                properties);
+    }
+
     private void assertStartingPositionAfterSequenceNumber(
             final StartingPosition startingPosition, final String 
sequenceNumber) {
         assertEquals(AFTER_SEQUENCE_NUMBER, startingPosition.type());
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
index fe3f058..c34b60e 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java
@@ -58,6 +58,21 @@ public class ShardConsumerTestUtils {
             final SequenceNumber startingSequenceNumber,
             final Properties consumerProperties)
             throws InterruptedException {
+        return assertNumberOfMessagesReceivedFromKinesis(
+                expectedNumberOfMessages,
+                recordPublisherFactory,
+                startingSequenceNumber,
+                consumerProperties,
+                SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
+    }
+
+    public static <T> ShardConsumerMetricsReporter 
assertNumberOfMessagesReceivedFromKinesis(
+            final int expectedNumberOfMessages,
+            final RecordPublisherFactory recordPublisherFactory,
+            final SequenceNumber startingSequenceNumber,
+            final Properties consumerProperties,
+            final SequenceNumber expectedLastProcessedSequenceNum)
+            throws InterruptedException {
         ShardConsumerMetricsReporter shardMetricsReporter =
                 new ShardConsumerMetricsReporter(mock(MetricGroup.class));
 
@@ -118,7 +133,7 @@ public class ShardConsumerTestUtils {
 
         assertEquals(expectedNumberOfMessages, 
sourceContext.getCollectedOutputs().size());
         assertEquals(
-                SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
+                expectedLastProcessedSequenceNum,
                 
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
 
         return shardMetricsReporter;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
index 07fec0d..45cba55 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
 
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
@@ -28,6 +29,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOut
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.TestConsumer;
 
+import com.amazonaws.http.timers.client.SdkInterruptedException;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import io.netty.handler.timeout.ReadTimeoutException;
 import org.hamcrest.Matchers;
@@ -54,6 +56,7 @@ import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_EXPONENTIAL_CONSTANT;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_BACKOFF_MAX;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_RETRIES;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
 import static 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
@@ -342,8 +345,7 @@ public class FanOutRecordPublisherTest {
                         backoff);
 
         int count = 0;
-        while (recordPublisher.run(new TestConsumer())
-                == RecordPublisher.RecordPublisherRunResult.INCOMPLETE) {
+        while (recordPublisher.run(new TestConsumer()) == 
RecordPublisherRunResult.INCOMPLETE) {
             if (++count > EXPECTED_SUBSCRIBE_TO_SHARD_RETRIES) {
                 break;
             }
@@ -488,6 +490,20 @@ public class FanOutRecordPublisherTest {
         }
     }
 
+    @Test
+    public void testInterruptedPublisherReturnsCancelled() throws Exception {
+        KinesisProxyV2Interface kinesis =
+                FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(
+                        new SdkInterruptedException(null));
+
+        RecordPublisher publisher =
+                createRecordPublisher(
+                        kinesis, 
StartingPosition.continueFromSequenceNumber(SEQUENCE_NUMBER));
+        RecordPublisherRunResult actual = publisher.run(new TestConsumer());
+
+        assertEquals(CANCELLED, actual);
+    }
+
     private List<UserRecord> flattenToUserRecords(final List<RecordBatch> 
recordBatch) {
         return recordBatch.stream()
                 .flatMap(b -> b.getDeaggregatedRecords().stream())
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index 3c6846e..0061cdc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisFanOutBehavioursFactory.SubscriptionErrorKinesisV2;
 
+import com.amazonaws.http.timers.client.SdkInterruptedException;
 import io.netty.handler.timeout.ReadTimeoutException;
 import org.junit.Rule;
 import org.junit.Test;
@@ -66,6 +67,22 @@ public class FanOutShardSubscriberTest {
     }
 
     @Test
+    public void testInterruptedErrorThrownToConsumer() throws Exception {
+        
thrown.expect(FanOutShardSubscriber.FanOutSubscriberInterruptedException.class);
+
+        SdkInterruptedException error = new SdkInterruptedException(null);
+        SubscriptionErrorKinesisV2 errorKinesisV2 =
+                
FakeKinesisFanOutBehavioursFactory.errorDuringSubscription(error);
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber("consumerArn", "shardId", 
errorKinesisV2);
+
+        software.amazon.awssdk.services.kinesis.model.StartingPosition 
startingPosition =
+                
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder().build();
+        subscriber.subscribeToShardAndConsumeRecords(startingPosition, event 
-> {});
+    }
+
+    @Test
     public void testMultipleErrorsThrownPassesFirstErrorToConsumer() throws 
Exception {
         thrown.expect(FanOutShardSubscriber.FanOutSubscriberException.class);
         thrown.expectMessage("Error 1!");

Reply via email to