This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new c662ddd [FLINK-39540][Connectors/Kinesis][6.x] Addressed bugs for EFO
subscriptions when they are completed (#243)
c662ddd is described below
commit c662ddd044d7ed69d995a84a39045cc8181aba8c
Author: pelaezryan <[email protected]>
AuthorDate: Thu Apr 30 03:23:28 2026 -0700
[FLINK-39540][Connectors/Kinesis][6.x] Addressed bugs for EFO subscriptions
when they are completed (#243)
* Removed resubscription for EFO subscriptions when they are completed
* Added unit tests for Kinesis EFO subscriptions
* Removed cancel() since this blocked subscriptions from restarting when
necessary (e.g. 5 minute subscription timeout)
* Addressed flink resubscriptions for resharding/shardend and timeouts
* Addressed revision comments and added java docs
* Ran mvn spotless to address formatting
---------
Co-authored-by: Ryan Pelaez <[email protected]>
---
.../fanout/FanOutKinesisShardSubscription.java | 87 +++++--
.../fanout/FanOutKinesisShardSubscriptionTest.java | 287 +++++++++++++++++++++
2 files changed, 358 insertions(+), 16 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
index a299e50..9674951 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
@@ -80,7 +80,6 @@ public class FanOutKinesisShardSubscription {
// Queue is meant for eager retrieval of records from the Kinesis stream.
We will always have 2
// record batches available on next read.
private final BlockingQueue<SubscribeToShardEvent> eventQueue = new
LinkedBlockingQueue<>(2);
- private final AtomicBoolean subscriptionActive = new AtomicBoolean(false);
private final AtomicReference<Throwable> subscriptionException = new
AtomicReference<>();
// Store the current starting position for this subscription. Will be
updated each time new
@@ -108,7 +107,8 @@ public class FanOutKinesisShardSubscription {
shardId,
startingPosition,
consumerArn);
- if (subscriptionActive.get()) {
+ if (shardSubscriber != null
+ && shardSubscriber.getSubscriptionState() ==
SubscriptionState.SUBSCRIBED) {
LOG.warn("Skipping activation of subscription since it is already
active.");
return;
}
@@ -166,9 +166,9 @@ public class FanOutKinesisShardSubscription {
shardId,
startingPosition,
consumerArn);
- subscriptionActive.set(true);
// Request first batch of records.
shardSubscriber.requestRecords();
+
} else {
String errorMessage =
"Timeout when subscribing to shard "
@@ -236,16 +236,37 @@ public class FanOutKinesisShardSubscription {
throw new KinesisStreamsSourceException(
"Subscription encountered unrecoverable exception.",
throwable);
}
+ final SubscriptionState state =
+ Optional.ofNullable(shardSubscriber)
+ .map(FanOutShardSubscriber::getSubscriptionState)
+ .orElse(SubscriptionState.NOT_STARTED);
- if (!subscriptionActive.get()) {
- LOG.debug(
- "Subscription to shard {} for consumer {} is not yet
active. Skipping.",
- shardId,
- consumerArn);
- return null;
+ switch (state) {
+ case NOT_STARTED:
+ LOG.debug(
+ "Subscription to shard {} for consumer {} is not yet
active. Skipping.",
+ shardId,
+ consumerArn);
+ return null;
+ case COMPLETED:
+ if (shardSubscriber.isShardEndReached()) {
+ LOG.info(
+ "Subscription reached SHARD_END for shard {} for
consumer {}.",
+ shardId,
+ consumerArn);
+ return null;
+ }
+ LOG.info(
+ "Subscription expired to shard {} for consumer {}.
Restarting.",
+ shardId,
+ consumerArn);
+ activateSubscription();
+ return null;
+ case SUBSCRIBED:
+ return eventQueue.poll();
+ default:
+ throw new IllegalStateException("Unknown subscription state: "
+ state);
}
-
- return eventQueue.poll();
}
/**
@@ -254,26 +275,48 @@ public class FanOutKinesisShardSubscription {
*/
private class FanOutShardSubscriber implements
Subscriber<SubscribeToShardEventStream> {
private final CountDownLatch subscriptionLatch;
-
private Subscription subscription;
+ private final AtomicReference<SubscriptionState> subscriptionState =
+ new AtomicReference<>(SubscriptionState.NOT_STARTED);
+ private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
+
private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
this.subscriptionLatch = subscriptionLatch;
}
+ /**
+ * Fetch the state that the subscriber is in.
+ *
+ * @return Subscription state for the subscriber.
+ */
+ public SubscriptionState getSubscriptionState() {
+ return subscriptionState.get();
+ }
+
+ /**
+ * Boolean whether this subscriber has reached the end of a shard.
+ *
+ * @return True if ShardEnd. false otherwise.
+ */
+ public boolean isShardEndReached() {
+ return isShardEnd.get();
+ }
+
public void requestRecords() {
subscription.request(1);
}
public void cancel() {
- if (!subscriptionActive.get()) {
+ if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
LOG.warn("Trying to cancel inactive subscription. Ignoring.");
return;
}
- subscriptionActive.set(false);
+
if (subscription != null) {
subscription.cancel();
}
+ this.subscriptionState.set(SubscriptionState.COMPLETED);
}
@Override
@@ -284,6 +327,7 @@ public class FanOutKinesisShardSubscription {
startingPosition,
consumerArn);
this.subscription = subscription;
+ this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
subscriptionLatch.countDown();
}
@@ -300,6 +344,11 @@ public class FanOutKinesisShardSubscription {
event);
eventQueue.put(event);
+ if (event.continuationSequenceNumber() ==
null) {
+ isShardEnd.set(true);
+ return;
+ }
+
// Update the starting position in case we
have to recreate the
// subscription
startingPosition =
@@ -330,8 +379,14 @@ public class FanOutKinesisShardSubscription {
@Override
public void onComplete() {
LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- cancel();
- activateSubscription();
+ this.subscriptionState.set(SubscriptionState.COMPLETED);
}
}
+
+ /** States that the {@code FanOutShardSubscriber} may be in. */
+ private enum SubscriptionState {
+ NOT_STARTED,
+ SUBSCRIBED,
+ COMPLETED
+ }
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
new file mode 100644
index 0000000..b6d66b7
--- /dev/null
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader.fanout;
+
+import
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import
org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory;
+
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
+import
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN;
+import static
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FanOutKinesisShardSubscription}. */
+class FanOutKinesisShardSubscriptionTest {
+
+ private static final String TEST_SHARD_ID = generateShardId(1);
+ private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(5);
+
+ @Test
+ void testNextEventReturnsNullBeforeActivation() {
+ AsyncStreamProxy proxy =
FakeKinesisFanOutBehaviorsFactory.boundedShard().build();
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ SUBSCRIPTION_TIMEOUT);
+
+ assertThat(subscription.nextEvent()).isNull();
+ }
+
+ @Test
+ void testResourceNotFoundExceptionThrown() {
+ AsyncStreamProxy proxy =
+
FakeKinesisFanOutBehaviorsFactory.resourceNotFoundWhenObtainingSubscription();
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ SUBSCRIPTION_TIMEOUT);
+
+ subscription.activateSubscription();
+
+ // Poll until exception surfaces
+ assertThatThrownBy(
+ () -> {
+ for (int i = 0; i < 200; i++) {
+ subscription.nextEvent();
+ Thread.sleep(50);
+ }
+ })
+ .isInstanceOf(ResourceNotFoundException.class);
+ }
+
+ @Test
+ void testUnrecoverableExceptionWrappedInSourceException() throws Exception
{
+ AsyncStreamProxy proxy =
+ new AsyncStreamProxy() {
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ SubscribeToShardResponseHandler responseHandler) {
+ responseHandler.exceptionOccurred(
+ new IllegalStateException("unrecoverable"));
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void close() {}
+ };
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ SUBSCRIPTION_TIMEOUT);
+
+ subscription.activateSubscription();
+
+ assertThatThrownBy(
+ () -> {
+ for (int i = 0; i < 200; i++) {
+ subscription.nextEvent();
+ Thread.sleep(50);
+ }
+ })
+ .isInstanceOf(KinesisStreamsSourceException.class)
+ .hasMessageContaining("unrecoverable");
+ }
+
+ @Test
+ void testSubscriptionTimeoutTerminatesSubscription() throws Exception {
+ AsyncStreamProxy proxy =
+ new AsyncStreamProxy() {
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ SubscribeToShardResponseHandler responseHandler) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public void close() {}
+ };
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ Duration.ofMillis(200));
+
+ subscription.activateSubscription();
+
+ // Wait for timeout to trigger, then poll - should recover
+ Thread.sleep(500);
+ SubscribeToShardEvent event = subscription.nextEvent();
+ assertThat(event).isNull();
+ }
+
+ @Test
+ void testExpiredSubscriptionResubscribes() throws Exception {
+ AtomicInteger subscribeCount = new AtomicInteger(0);
+ AsyncStreamProxy proxy =
+ new AsyncStreamProxy() {
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ SubscribeToShardResponseHandler responseHandler) {
+ subscribeCount.incrementAndGet();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ responseHandler.responseReceived(
+
SubscribeToShardResponse.builder().build());
+ responseHandler.onEventStream(
+ subscriber -> {
+ subscriber.onSubscribe(
+ new Subscription() {
+ @Override
+ public void
request(long n) {
+ // Complete
without sending any
+ // events
(simulates 5-min expiry)
+
subscriber.onComplete();
+ }
+
+ @Override
+ public void
cancel() {}
+ });
+ });
+ return null;
+ });
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ SUBSCRIPTION_TIMEOUT);
+
+ subscription.activateSubscription();
+ Thread.sleep(500);
+
+ // nextEvent() should detect COMPLETED without shard-end and trigger
resubscription
+ subscription.nextEvent();
+ Thread.sleep(500);
+
+ assertThat(subscribeCount.get()).isEqualTo(2);
+ }
+
+ @Test
+ void testShardEndDoesNotResubscribe() throws Exception {
+ AtomicInteger subscribeCount = new AtomicInteger(0);
+ AsyncStreamProxy proxy =
+ new AsyncStreamProxy() {
+ @Override
+ public CompletableFuture<Void> subscribeToShard(
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ SubscribeToShardResponseHandler responseHandler) {
+ subscribeCount.incrementAndGet();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ responseHandler.responseReceived(
+
SubscribeToShardResponse.builder().build());
+ responseHandler.onEventStream(
+ subscriber -> {
+ subscriber.onSubscribe(
+ new Subscription() {
+ private boolean
sent = false;
+
+ @Override
+ public void
request(long n) {
+ if (!sent) {
+ sent =
true;
+ // Send
event with null
+ //
continuation (shard end)
+
subscriber.onNext(
+
SubscribeToShardEvent
+
.builder()
+
.millisBehindLatest(
+
0L)
+
.continuationSequenceNumber(
+
null)
+
.build());
+ } else {
+
subscriber.onComplete();
+ }
+ }
+
+ @Override
+ public void
cancel() {}
+ });
+ });
+ return null;
+ });
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ FanOutKinesisShardSubscription subscription =
+ new FanOutKinesisShardSubscription(
+ proxy,
+ CONSUMER_ARN,
+ TEST_SHARD_ID,
+ StartingPosition.fromStart(),
+ SUBSCRIPTION_TIMEOUT);
+
+ subscription.activateSubscription();
+ Thread.sleep(500);
+
+ // Drain the shard-end event from the queue
+ subscription.nextEvent();
+ Thread.sleep(500);
+
+ // Should not have resubscribed — shard has ended
+ assertThat(subscribeCount.get()).isEqualTo(1);
+ assertThat(subscription.nextEvent()).isNull();
+ }
+}