pelaezryan commented on code in PR #247:
URL: 
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3229675147


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -102,100 +117,132 @@ public FanOutKinesisShardSubscription(
 
     /** Method to allow eager activation of the subscription. */
     public void activateSubscription() {
-        LOG.info(
-                "Activating subscription to shard {} with starting position {} 
for consumer {}.",
-                shardId,
-                startingPosition,
-                consumerArn);
-        if (shardSubscriber != null
-                && shardSubscriber.getSubscriptionState() == 
SubscriptionState.SUBSCRIBED) {
-            LOG.warn("Skipping activation of subscription since it is already 
active.");
-            return;
-        }
+        synchronized (lockObject) {
+            if (closed) {
+                LOG.info(
+                        "Subscription for shard {} is closed; skipping 
activation.",
+                        shardId);
+                return;
+            }
+            if (startingPosition == null) {
+                LOG.info(
+                        "Shard {} has been completely consumed (shard end). 
Skipping re-subscription.",
+                        shardId);
+                return;
+            }
+            if (shardSubscriber != null) {
+                LOG.warn(
+                        "Shard {} Skipping activation of subscription since 
one is already active or in progress.",
+                        shardId);
+                return;
+            }
 
-        // We have to use our own CountDownLatch to wait for subscription to 
be acquired because
-        // subscription event is tracked via the handler.
-        CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
-        shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
-        SubscribeToShardResponseHandler responseHandler =
-                SubscribeToShardResponseHandler.builder()
-                        .subscriber(() -> shardSubscriber)
-                        .onError(
-                                throwable -> {
-                                    // Errors that occur when obtaining a 
subscription are thrown
-                                    // here.
-                                    // After subscription is acquired, these 
errors can be ignored.
-                                    if (waitForSubscriptionLatch.getCount() > 
0) {
-                                        terminateSubscription(throwable);
-                                        waitForSubscriptionLatch.countDown();
+            LOG.info(
+                    "Activating subscription to shard {} with starting 
position {} for consumer {}.",
+                    shardId,
+                    startingPosition,
+                    consumerArn);
+
+            FanOutShardSubscriber subscriber = new FanOutShardSubscriber();
+            shardSubscriber = subscriber;
+
+            SubscribeToShardResponseHandler responseHandler =
+                    SubscribeToShardResponseHandler.builder()
+                            .subscriber(() -> subscriber)
+                            .onError(
+                                    throwable -> {
+                                        synchronized (lockObject) {
+                                            if (!disposeIfActive(subscriber)) {
+                                                return;
+                                            }
+                                        }
+                                        LOG.error(
+                                                "Error onError subscribing to 
shard {} with "
+                                                        + "starting position 
{} for consumer {}.",
+                                                shardId,
+                                                startingPosition,
+                                                consumerArn,
+                                                throwable);
+                                        setSubscriptionException(throwable);
+                                    })
+                            .build();
+
+            cancelTimeoutFuture();
+            timeoutFuture =
+                    TIMEOUT_SCHEDULER.schedule(
+                            () -> {
+                                String errorMessage =
+                                        "Timeout when subscribing to shard "
+                                                + shardId
+                                                + " with starting position "
+                                                + startingPosition
+                                                + " for consumer "
+                                                + consumerArn
+                                                + ".";
+                                synchronized (lockObject) {
+                                    // The timeout future was cancelled 
between firing and
+                                    // acquiring the lock (e.g. onSubscribe 
succeeded, or another
+                                    // error path disposed the subscriber). Do 
nothing.
+                                    if (timeoutFuture == null) {
+                                        return;
+                                    }
+                                    if (!disposeIfActive(subscriber)) {
+                                        return;
                                     }
-                                })
-                        .build();
-
-        // We don't need to keep track of the future here because we monitor 
subscription success
-        // using our own CountDownLatch
-        kinesis.subscribeToShard(consumerArn, shardId, startingPosition, 
responseHandler)
-                .exceptionally(
-                        throwable -> {
-                            // If consumer exists and is still activating, we 
want to countdown.
-                            if (ExceptionUtils.findThrowable(
-                                            throwable, 
ResourceInUseException.class)
-                                    .isPresent()) {
-                                waitForSubscriptionLatch.countDown();
+                                }
+                                LOG.error(errorMessage);
+                                setSubscriptionException(new 
TimeoutException(errorMessage));
+                            },
+                            subscriptionTimeout.toMillis(),
+                            TimeUnit.MILLISECONDS);
+
+            kinesis.subscribeToShard(consumerArn, shardId, startingPosition, 
responseHandler)
+                    .exceptionally(
+                            throwable -> {
+                                synchronized (lockObject) {
+                                    if (!disposeIfActive(subscriber)) {
+                                        return null;
+                                    }
+                                }
+                                LOG.error(
+                                        "Error exceptionally subscribing to 
shard {} with starting position {} for "
+                                                + "consumer {}.",
+                                        shardId,
+                                        startingPosition,
+                                        consumerArn,
+                                        throwable);
+                                setSubscriptionException(throwable);
                                 return null;
-                            }
-                            LOG.error(
-                                    "Error subscribing to shard {} with 
starting position {} for consumer {}.",
-                                    shardId,
-                                    startingPosition,
-                                    consumerArn,
-                                    throwable);
-                            terminateSubscription(throwable);
-                            return null;
-                        });
-
-        // We have to handle timeout for subscriptions separately because Java 
8 does not support a
-        // fluent orTimeout() methods on CompletableFuture.
-        CompletableFuture.runAsync(
-                () -> {
-                    try {
-                        if (waitForSubscriptionLatch.await(
-                                subscriptionTimeout.toMillis(), 
TimeUnit.MILLISECONDS)) {
-                            LOG.info(
-                                    "Successfully subscribed to shard {} with 
starting position {} for consumer {}.",
-                                    shardId,
-                                    startingPosition,
-                                    consumerArn);
-                            // Request first batch of records.
-                            shardSubscriber.requestRecords();
-
-                        } else {
-                            String errorMessage =
-                                    "Timeout when subscribing to shard "
-                                            + shardId
-                                            + " with starting position "
-                                            + startingPosition
-                                            + " for consumer "
-                                            + consumerArn
-                                            + ".";
-                            LOG.error(errorMessage);
-                            terminateSubscription(new 
TimeoutException(errorMessage));
-                        }
-                    } catch (InterruptedException e) {
-                        LOG.warn("Interrupted while waiting for subscription 
to complete.", e);
-                        terminateSubscription(e);
-                        Thread.currentThread().interrupt();
-                    }
-                });
+                            });
+        }
+    }
+
+    // Must be called while holding lockObject
+    private void cancelTimeoutFuture() {
+        if (timeoutFuture != null) {
+            timeoutFuture.cancel(false);
+            timeoutFuture = null;
+        }
+    }
+
+    // Must be called while holding lockObject
+    private boolean disposeIfActive(FanOutShardSubscriber subscriber) {
+        if (shardSubscriber != subscriber) {

Review Comment:
   do we need to pass in a subscriber and do this validation? Shouldn't there 
be just the one?



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -274,61 +300,52 @@ public SubscribeToShardEvent nextEvent() {
      * Streams.
      */
     private class FanOutShardSubscriber implements 
Subscriber<SubscribeToShardEventStream> {
-        private final CountDownLatch subscriptionLatch;
-        private Subscription subscription;
-
-        private final AtomicReference<SubscriptionState> subscriptionState =
-                new AtomicReference<>(SubscriptionState.NOT_STARTED);
-        private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
-
-        private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
-            this.subscriptionLatch = subscriptionLatch;
-        }
-
-        /**
-         * Fetch the state that the subscriber is in.
-         *
-         * @return Subscription state for the subscriber.
-         */
-        public SubscriptionState getSubscriptionState() {
-            return subscriptionState.get();
-        }
 
-        /**
-         * Boolean whether this subscriber has reached the end of a shard.
-         *
-         * @return True if ShardEnd. false otherwise.
-         */
-        public boolean isShardEndReached() {
-            return isShardEnd.get();
-        }
+        private Subscription subscription;
 
         public void requestRecords() {
-            subscription.request(1);
-        }
-
-        public void cancel() {
-            if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
-                LOG.warn("Trying to cancel inactive subscription. Ignoring.");
-                return;
+            // subscription can be null if onSubscribe has not yet fired on a 
freshly activated
+            // subscriber. In that case the initial request(1) will be issued 
from onSubscribe
+            // itself, so it is safe to skip here.
+            if (subscription != null) {
+                subscription.request(1);
             }
+        }
 
+        public void cancelSubscription() {
             if (subscription != null) {
                 subscription.cancel();
             }
-            this.subscriptionState.set(SubscriptionState.COMPLETED);
         }
 
         @Override
         public void onSubscribe(Subscription subscription) {
-            LOG.info(
-                    "Successfully subscribed to shard {} at {} using consumer 
{}.",
-                    shardId,
-                    startingPosition,
-                    consumerArn);
-            this.subscription = subscription;
-            this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
-            subscriptionLatch.countDown();
+            synchronized (lockObject) {
+                if (shardSubscriber != this) {
+                    // Timeout/error disposed this subscriber and a new one 
was created before SDK
+                    // called onSubscribe
+                    subscription.cancel();

Review Comment:
   Are we sure that the subscription passed in is not in use by the new 
subscriber?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to