This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new c662ddd  [FLINK-39540][Connectors/Kinesis][6.x] Addressed bugs for EFO 
subscriptions when they are completed (#243)
c662ddd is described below

commit c662ddd044d7ed69d995a84a39045cc8181aba8c
Author: pelaezryan <[email protected]>
AuthorDate: Thu Apr 30 03:23:28 2026 -0700

    [FLINK-39540][Connectors/Kinesis][6.x] Addressed bugs for EFO subscriptions 
when they are completed (#243)
    
    * Removed resubscription for EFO subscriptions when they are completed
    
    * Added unit tests for Kinesis EFO subscriptions
    
    * Removed cancel() since this blocked subscriptions from restarting when 
necessary (e.g. 5 minute subscription timeout)
    
    * Addressed flink resubscriptions for resharding/shardend and timeouts
    
    * Addressed revision comments and added java docs
    
    * Ran mvn spotless to address formatting
    
    ---------
    
    Co-authored-by: Ryan Pelaez <[email protected]>
---
 .../fanout/FanOutKinesisShardSubscription.java     |  87 +++++--
 .../fanout/FanOutKinesisShardSubscriptionTest.java | 287 +++++++++++++++++++++
 2 files changed, 358 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
index a299e50..9674951 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
@@ -80,7 +80,6 @@ public class FanOutKinesisShardSubscription {
     // Queue is meant for eager retrieval of records from the Kinesis stream. 
We will always have 2
     // record batches available on next read.
     private final BlockingQueue<SubscribeToShardEvent> eventQueue = new 
LinkedBlockingQueue<>(2);
-    private final AtomicBoolean subscriptionActive = new AtomicBoolean(false);
     private final AtomicReference<Throwable> subscriptionException = new 
AtomicReference<>();
 
     // Store the current starting position for this subscription. Will be 
updated each time new
@@ -108,7 +107,8 @@ public class FanOutKinesisShardSubscription {
                 shardId,
                 startingPosition,
                 consumerArn);
-        if (subscriptionActive.get()) {
+        if (shardSubscriber != null
+                && shardSubscriber.getSubscriptionState() == 
SubscriptionState.SUBSCRIBED) {
             LOG.warn("Skipping activation of subscription since it is already 
active.");
             return;
         }
@@ -166,9 +166,9 @@ public class FanOutKinesisShardSubscription {
                                     shardId,
                                     startingPosition,
                                     consumerArn);
-                            subscriptionActive.set(true);
                             // Request first batch of records.
                             shardSubscriber.requestRecords();
+
                         } else {
                             String errorMessage =
                                     "Timeout when subscribing to shard "
@@ -236,16 +236,37 @@ public class FanOutKinesisShardSubscription {
             throw new KinesisStreamsSourceException(
                     "Subscription encountered unrecoverable exception.", 
throwable);
         }
+        final SubscriptionState state =
+                Optional.ofNullable(shardSubscriber)
+                        .map(FanOutShardSubscriber::getSubscriptionState)
+                        .orElse(SubscriptionState.NOT_STARTED);
 
-        if (!subscriptionActive.get()) {
-            LOG.debug(
-                    "Subscription to shard {} for consumer {} is not yet 
active. Skipping.",
-                    shardId,
-                    consumerArn);
-            return null;
+        switch (state) {
+            case NOT_STARTED:
+                LOG.debug(
+                        "Subscription to shard {} for consumer {} is not yet 
active. Skipping.",
+                        shardId,
+                        consumerArn);
+                return null;
+            case COMPLETED:
+                if (shardSubscriber.isShardEndReached()) {
+                    LOG.info(
+                            "Subscription reached SHARD_END for shard {} for 
consumer {}.",
+                            shardId,
+                            consumerArn);
+                    return null;
+                }
+                LOG.info(
+                        "Subscription expired to shard {} for consumer {}. 
Restarting.",
+                        shardId,
+                        consumerArn);
+                activateSubscription();
+                return null;
+            case SUBSCRIBED:
+                return eventQueue.poll();
+            default:
+                throw new IllegalStateException("Unknown subscription state: " 
+ state);
         }
-
-        return eventQueue.poll();
     }
 
     /**
@@ -254,26 +275,48 @@ public class FanOutKinesisShardSubscription {
      */
     private class FanOutShardSubscriber implements 
Subscriber<SubscribeToShardEventStream> {
         private final CountDownLatch subscriptionLatch;
-
         private Subscription subscription;
 
+        private final AtomicReference<SubscriptionState> subscriptionState =
+                new AtomicReference<>(SubscriptionState.NOT_STARTED);
+        private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
+
         private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
             this.subscriptionLatch = subscriptionLatch;
         }
 
+        /**
+         * Fetch the state that the subscriber is in.
+         *
+         * @return Subscription state for the subscriber.
+         */
+        public SubscriptionState getSubscriptionState() {
+            return subscriptionState.get();
+        }
+
+        /**
+         * Boolean whether this subscriber has reached the end of a shard.
+         *
+         * @return True if ShardEnd. false otherwise.
+         */
+        public boolean isShardEndReached() {
+            return isShardEnd.get();
+        }
+
         public void requestRecords() {
             subscription.request(1);
         }
 
         public void cancel() {
-            if (!subscriptionActive.get()) {
+            if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
                 LOG.warn("Trying to cancel inactive subscription. Ignoring.");
                 return;
             }
-            subscriptionActive.set(false);
+
             if (subscription != null) {
                 subscription.cancel();
             }
+            this.subscriptionState.set(SubscriptionState.COMPLETED);
         }
 
         @Override
@@ -284,6 +327,7 @@ public class FanOutKinesisShardSubscription {
                     startingPosition,
                     consumerArn);
             this.subscription = subscription;
+            this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
             subscriptionLatch.countDown();
         }
 
@@ -300,6 +344,11 @@ public class FanOutKinesisShardSubscription {
                                         event);
                                 eventQueue.put(event);
 
+                                if (event.continuationSequenceNumber() == 
null) {
+                                    isShardEnd.set(true);
+                                    return;
+                                }
+
                                 // Update the starting position in case we 
have to recreate the
                                 // subscription
                                 startingPosition =
@@ -330,8 +379,14 @@ public class FanOutKinesisShardSubscription {
         @Override
         public void onComplete() {
             LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
-            cancel();
-            activateSubscription();
+            this.subscriptionState.set(SubscriptionState.COMPLETED);
         }
     }
+
+    /** States that the {@code FanOutShardSubscriber} may be in. */
+    private enum SubscriptionState {
+        NOT_STARTED,
+        SUBSCRIBED,
+        COMPLETED
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
new file mode 100644
index 0000000..b6d66b7
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader.fanout;
+
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import 
org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory;
+
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
+import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FanOutKinesisShardSubscription}. */
+class FanOutKinesisShardSubscriptionTest {
+
+    private static final String TEST_SHARD_ID = generateShardId(1);
+    private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(5);
+
+    @Test
+    void testNextEventReturnsNullBeforeActivation() {
+        AsyncStreamProxy proxy = 
FakeKinesisFanOutBehaviorsFactory.boundedShard().build();
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        SUBSCRIPTION_TIMEOUT);
+
+        assertThat(subscription.nextEvent()).isNull();
+    }
+
+    @Test
+    void testResourceNotFoundExceptionThrown() {
+        AsyncStreamProxy proxy =
+                
FakeKinesisFanOutBehaviorsFactory.resourceNotFoundWhenObtainingSubscription();
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        SUBSCRIPTION_TIMEOUT);
+
+        subscription.activateSubscription();
+
+        // Poll until exception surfaces
+        assertThatThrownBy(
+                        () -> {
+                            for (int i = 0; i < 200; i++) {
+                                subscription.nextEvent();
+                                Thread.sleep(50);
+                            }
+                        })
+                .isInstanceOf(ResourceNotFoundException.class);
+    }
+
+    @Test
+    void testUnrecoverableExceptionWrappedInSourceException() throws Exception 
{
+        AsyncStreamProxy proxy =
+                new AsyncStreamProxy() {
+                    @Override
+                    public CompletableFuture<Void> subscribeToShard(
+                            String consumerArn,
+                            String shardId,
+                            StartingPosition startingPosition,
+                            SubscribeToShardResponseHandler responseHandler) {
+                        responseHandler.exceptionOccurred(
+                                new IllegalStateException("unrecoverable"));
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        SUBSCRIPTION_TIMEOUT);
+
+        subscription.activateSubscription();
+
+        assertThatThrownBy(
+                        () -> {
+                            for (int i = 0; i < 200; i++) {
+                                subscription.nextEvent();
+                                Thread.sleep(50);
+                            }
+                        })
+                .isInstanceOf(KinesisStreamsSourceException.class)
+                .hasMessageContaining("unrecoverable");
+    }
+
+    @Test
+    void testSubscriptionTimeoutTerminatesSubscription() throws Exception {
+        AsyncStreamProxy proxy =
+                new AsyncStreamProxy() {
+                    @Override
+                    public CompletableFuture<Void> subscribeToShard(
+                            String consumerArn,
+                            String shardId,
+                            StartingPosition startingPosition,
+                            SubscribeToShardResponseHandler responseHandler) {
+                        return new CompletableFuture<>();
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        Duration.ofMillis(200));
+
+        subscription.activateSubscription();
+
+        // Wait for timeout to trigger, then poll - should recover
+        Thread.sleep(500);
+        SubscribeToShardEvent event = subscription.nextEvent();
+        assertThat(event).isNull();
+    }
+
+    @Test
+    void testExpiredSubscriptionResubscribes() throws Exception {
+        AtomicInteger subscribeCount = new AtomicInteger(0);
+        AsyncStreamProxy proxy =
+                new AsyncStreamProxy() {
+                    @Override
+                    public CompletableFuture<Void> subscribeToShard(
+                            String consumerArn,
+                            String shardId,
+                            StartingPosition startingPosition,
+                            SubscribeToShardResponseHandler responseHandler) {
+                        subscribeCount.incrementAndGet();
+                        return CompletableFuture.supplyAsync(
+                                () -> {
+                                    responseHandler.responseReceived(
+                                            
SubscribeToShardResponse.builder().build());
+                                    responseHandler.onEventStream(
+                                            subscriber -> {
+                                                subscriber.onSubscribe(
+                                                        new Subscription() {
+                                                            @Override
+                                                            public void 
request(long n) {
+                                                                // Complete 
without sending any
+                                                                // events 
(simulates 5-min expiry)
+                                                                
subscriber.onComplete();
+                                                            }
+
+                                                            @Override
+                                                            public void 
cancel() {}
+                                                        });
+                                            });
+                                    return null;
+                                });
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        SUBSCRIPTION_TIMEOUT);
+
+        subscription.activateSubscription();
+        Thread.sleep(500);
+
+        // nextEvent() should detect COMPLETED without shard-end and trigger 
resubscription
+        subscription.nextEvent();
+        Thread.sleep(500);
+
+        assertThat(subscribeCount.get()).isEqualTo(2);
+    }
+
+    @Test
+    void testShardEndDoesNotResubscribe() throws Exception {
+        AtomicInteger subscribeCount = new AtomicInteger(0);
+        AsyncStreamProxy proxy =
+                new AsyncStreamProxy() {
+                    @Override
+                    public CompletableFuture<Void> subscribeToShard(
+                            String consumerArn,
+                            String shardId,
+                            StartingPosition startingPosition,
+                            SubscribeToShardResponseHandler responseHandler) {
+                        subscribeCount.incrementAndGet();
+                        return CompletableFuture.supplyAsync(
+                                () -> {
+                                    responseHandler.responseReceived(
+                                            
SubscribeToShardResponse.builder().build());
+                                    responseHandler.onEventStream(
+                                            subscriber -> {
+                                                subscriber.onSubscribe(
+                                                        new Subscription() {
+                                                            private boolean 
sent = false;
+
+                                                            @Override
+                                                            public void 
request(long n) {
+                                                                if (!sent) {
+                                                                    sent = 
true;
+                                                                    // Send 
event with null
+                                                                    // 
continuation (shard end)
+                                                                    
subscriber.onNext(
+                                                                            
SubscribeToShardEvent
+                                                                               
     .builder()
+                                                                               
     .millisBehindLatest(
+                                                                               
             0L)
+                                                                               
     .continuationSequenceNumber(
+                                                                               
             null)
+                                                                               
     .build());
+                                                                } else {
+                                                                    
subscriber.onComplete();
+                                                                }
+                                                            }
+
+                                                            @Override
+                                                            public void 
cancel() {}
+                                                        });
+                                            });
+                                    return null;
+                                });
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+
+        FanOutKinesisShardSubscription subscription =
+                new FanOutKinesisShardSubscription(
+                        proxy,
+                        CONSUMER_ARN,
+                        TEST_SHARD_ID,
+                        StartingPosition.fromStart(),
+                        SUBSCRIPTION_TIMEOUT);
+
+        subscription.activateSubscription();
+        Thread.sleep(500);
+
+        // Drain the shard-end event from the queue
+        subscription.nextEvent();
+        Thread.sleep(500);
+
+        // Should not have resubscribed — shard has ended
+        assertThat(subscribeCount.get()).isEqualTo(1);
+        assertThat(subscription.nextEvent()).isNull();
+    }
+}

Reply via email to