chenylee-aws commented on code in PR #247:
URL: 
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3239583709


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -102,100 +117,132 @@ public FanOutKinesisShardSubscription(
 
     /** Method to allow eager activation of the subscription. */
     public void activateSubscription() {
-        LOG.info(
-                "Activating subscription to shard {} with starting position {} 
for consumer {}.",
-                shardId,
-                startingPosition,
-                consumerArn);
-        if (shardSubscriber != null
-                && shardSubscriber.getSubscriptionState() == 
SubscriptionState.SUBSCRIBED) {
-            LOG.warn("Skipping activation of subscription since it is already 
active.");
-            return;
-        }
+        synchronized (lockObject) {
+            if (closed) {
+                LOG.info(
+                        "Subscription for shard {} is closed; skipping 
activation.",
+                        shardId);
+                return;
+            }
+            if (startingPosition == null) {
+                LOG.info(
+                        "Shard {} has been completely consumed (shard end). 
Skipping re-subscription.",
+                        shardId);
+                return;
+            }
+            if (shardSubscriber != null) {
+                LOG.warn(
+                        "Shard {} Skipping activation of subscription since 
one is already active or in progress.",
+                        shardId);
+                return;
+            }
 
-        // We have to use our own CountDownLatch to wait for subscription to 
be acquired because
-        // subscription event is tracked via the handler.
-        CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
-        shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
-        SubscribeToShardResponseHandler responseHandler =
-                SubscribeToShardResponseHandler.builder()
-                        .subscriber(() -> shardSubscriber)
-                        .onError(
-                                throwable -> {
-                                    // Errors that occur when obtaining a 
subscription are thrown
-                                    // here.
-                                    // After subscription is acquired, these 
errors can be ignored.
-                                    if (waitForSubscriptionLatch.getCount() > 
0) {
-                                        terminateSubscription(throwable);
-                                        waitForSubscriptionLatch.countDown();
+            LOG.info(
+                    "Activating subscription to shard {} with starting 
position {} for consumer {}.",
+                    shardId,
+                    startingPosition,
+                    consumerArn);
+
+            FanOutShardSubscriber subscriber = new FanOutShardSubscriber();
+            shardSubscriber = subscriber;
+
+            SubscribeToShardResponseHandler responseHandler =
+                    SubscribeToShardResponseHandler.builder()
+                            .subscriber(() -> subscriber)
+                            .onError(
+                                    throwable -> {
+                                        synchronized (lockObject) {
+                                            if (!disposeIfActive(subscriber)) {
+                                                return;
+                                            }
+                                        }
+                                        LOG.error(
+                                                "Error onError subscribing to 
shard {} with "
+                                                        + "starting position 
{} for consumer {}.",
+                                                shardId,
+                                                startingPosition,
+                                                consumerArn,
+                                                throwable);
+                                        setSubscriptionException(throwable);
+                                    })
+                            .build();
+
+            cancelTimeoutFuture();
+            timeoutFuture =
+                    TIMEOUT_SCHEDULER.schedule(
+                            () -> {
+                                String errorMessage =
+                                        "Timeout when subscribing to shard "
+                                                + shardId
+                                                + " with starting position "
+                                                + startingPosition
+                                                + " for consumer "
+                                                + consumerArn
+                                                + ".";
+                                synchronized (lockObject) {
+                                    // The timeout future was cancelled 
between firing and
+                                    // acquiring the lock (e.g. onSubscribe 
succeeded, or another
+                                    // error path disposed the subscriber). Do 
nothing.
+                                    if (timeoutFuture == null) {
+                                        return;
+                                    }
+                                    if (!disposeIfActive(subscriber)) {
+                                        return;
                                     }
-                                })
-                        .build();
-
-        // We don't need to keep track of the future here because we monitor 
subscription success
-        // using our own CountDownLatch
-        kinesis.subscribeToShard(consumerArn, shardId, startingPosition, 
responseHandler)
-                .exceptionally(
-                        throwable -> {
-                            // If consumer exists and is still activating, we 
want to countdown.
-                            if (ExceptionUtils.findThrowable(
-                                            throwable, 
ResourceInUseException.class)
-                                    .isPresent()) {
-                                waitForSubscriptionLatch.countDown();
+                                }
+                                LOG.error(errorMessage);
+                                setSubscriptionException(new 
TimeoutException(errorMessage));
+                            },
+                            subscriptionTimeout.toMillis(),
+                            TimeUnit.MILLISECONDS);
+
+            kinesis.subscribeToShard(consumerArn, shardId, startingPosition, 
responseHandler)
+                    .exceptionally(
+                            throwable -> {
+                                synchronized (lockObject) {
+                                    if (!disposeIfActive(subscriber)) {
+                                        return null;
+                                    }
+                                }
+                                LOG.error(
+                                        "Error exceptionally subscribing to 
shard {} with starting position {} for "
+                                                + "consumer {}.",
+                                        shardId,
+                                        startingPosition,
+                                        consumerArn,
+                                        throwable);
+                                setSubscriptionException(throwable);
                                 return null;
-                            }
-                            LOG.error(
-                                    "Error subscribing to shard {} with 
starting position {} for consumer {}.",
-                                    shardId,
-                                    startingPosition,
-                                    consumerArn,
-                                    throwable);
-                            terminateSubscription(throwable);
-                            return null;
-                        });
-
-        // We have to handle timeout for subscriptions separately because Java 
8 does not support a
-        // fluent orTimeout() methods on CompletableFuture.
-        CompletableFuture.runAsync(
-                () -> {
-                    try {
-                        if (waitForSubscriptionLatch.await(
-                                subscriptionTimeout.toMillis(), 
TimeUnit.MILLISECONDS)) {
-                            LOG.info(
-                                    "Successfully subscribed to shard {} with 
starting position {} for consumer {}.",
-                                    shardId,
-                                    startingPosition,
-                                    consumerArn);
-                            // Request first batch of records.
-                            shardSubscriber.requestRecords();
-
-                        } else {
-                            String errorMessage =
-                                    "Timeout when subscribing to shard "
-                                            + shardId
-                                            + " with starting position "
-                                            + startingPosition
-                                            + " for consumer "
-                                            + consumerArn
-                                            + ".";
-                            LOG.error(errorMessage);
-                            terminateSubscription(new 
TimeoutException(errorMessage));
-                        }
-                    } catch (InterruptedException e) {
-                        LOG.warn("Interrupted while waiting for subscription 
to complete.", e);
-                        terminateSubscription(e);
-                        Thread.currentThread().interrupt();
-                    }
-                });
+                            });
+        }
+    }
+
+    // Must be called while holding lockObject
+    private void cancelTimeoutFuture() {
+        if (timeoutFuture != null) {
+            timeoutFuture.cancel(false);
+            timeoutFuture = null;
+        }
+    }
+
+    // Must be called while holding lockObject
+    private boolean disposeIfActive(FanOutShardSubscriber subscriber) {
+        if (shardSubscriber != subscriber) {

Review Comment:
   Yes, it's essential. Each error callback (`responseHandler.onError`, 
`.exceptionally`, timeout scheduler, `Subscriber.onError`) captures its own 
`FanOutShardSubscriber` in the closure at activation time. By the time the 
callback fires, `shardSubscriber` may already point at a different subscriber — 
e.g., an EFO 5-minute `onComplete → activateSubscription` rotation may have  
replaced it. Without the identity check, a stale callback would dispose the new 
subscriber. The check also deduplicates the 3-4 error paths that fire for the 
same failure (SDK delivers the same error through multiple layers): only the 
winning path does teardown, the rest no-op cleanly.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -274,61 +300,52 @@ public SubscribeToShardEvent nextEvent() {
      * Streams.
      */
     private class FanOutShardSubscriber implements 
Subscriber<SubscribeToShardEventStream> {
-        private final CountDownLatch subscriptionLatch;
-        private Subscription subscription;
-
-        private final AtomicReference<SubscriptionState> subscriptionState =
-                new AtomicReference<>(SubscriptionState.NOT_STARTED);
-        private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
-
-        private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
-            this.subscriptionLatch = subscriptionLatch;
-        }
-
-        /**
-         * Fetch the state that the subscriber is in.
-         *
-         * @return Subscription state for the subscriber.
-         */
-        public SubscriptionState getSubscriptionState() {
-            return subscriptionState.get();
-        }
 
-        /**
-         * Boolean whether this subscriber has reached the end of a shard.
-         *
-         * @return True if ShardEnd. false otherwise.
-         */
-        public boolean isShardEndReached() {
-            return isShardEnd.get();
-        }
+        private Subscription subscription;
 
         public void requestRecords() {
-            subscription.request(1);
-        }
-
-        public void cancel() {
-            if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
-                LOG.warn("Trying to cancel inactive subscription. Ignoring.");
-                return;
+            // subscription can be null if onSubscribe has not yet fired on a 
freshly activated
+            // subscriber. In that case the initial request(1) will be issued 
from onSubscribe
+            // itself, so it is safe to skip here.
+            if (subscription != null) {
+                subscription.request(1);
             }
+        }
 
+        public void cancelSubscription() {
             if (subscription != null) {
                 subscription.cancel();
             }
-            this.subscriptionState.set(SubscriptionState.COMPLETED);
         }
 
         @Override
         public void onSubscribe(Subscription subscription) {
-            LOG.info(
-                    "Successfully subscribed to shard {} at {} using consumer 
{}.",
-                    shardId,
-                    startingPosition,
-                    consumerArn);
-            this.subscription = subscription;
-            this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
-            subscriptionLatch.countDown();
+            synchronized (lockObject) {
+                if (shardSubscriber != this) {
+                    // Timeout/error disposed this subscriber and a new one 
was created before SDK
+                    // called onSubscribe
+                    subscription.cancel();

Review Comment:
   Yes. The `Subscription` parameter is the reactive-streams handle for this 
specific `subscribeToShard` call — it's bound one-to-one with this 
`FanOutShardSubscriber` instance (the SDK creates a fresh one per call and 
invokes `onSubscribe` with it exactly once, per Reactive Streams Rule 2.12). 
The replacement subscriber has its own `subscribeToShard` call producing its 
own distinct `Subscription`. Cancelling this stale one releases this particular 
HTTP/2 stream slot without affecting the new subscriber's stream.
     
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to