pelaezryan commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3229675147
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -102,100 +117,132 @@ public FanOutKinesisShardSubscription(
/** Method to allow eager activation of the subscription. */
public void activateSubscription() {
- LOG.info(
- "Activating subscription to shard {} with starting position {}
for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- if (shardSubscriber != null
- && shardSubscriber.getSubscriptionState() ==
SubscriptionState.SUBSCRIBED) {
- LOG.warn("Skipping activation of subscription since it is already
active.");
- return;
- }
+ synchronized (lockObject) {
+ if (closed) {
+ LOG.info(
+ "Subscription for shard {} is closed; skipping
activation.",
+ shardId);
+ return;
+ }
+ if (startingPosition == null) {
+ LOG.info(
+ "Shard {} has been completely consumed (shard end).
Skipping re-subscription.",
+ shardId);
+ return;
+ }
+ if (shardSubscriber != null) {
+ LOG.warn(
+ "Shard {} Skipping activation of subscription since
one is already active or in progress.",
+ shardId);
+ return;
+ }
- // We have to use our own CountDownLatch to wait for subscription to
be acquired because
- // subscription event is tracked via the handler.
- CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
- shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch);
- SubscribeToShardResponseHandler responseHandler =
- SubscribeToShardResponseHandler.builder()
- .subscriber(() -> shardSubscriber)
- .onError(
- throwable -> {
- // Errors that occur when obtaining a
subscription are thrown
- // here.
- // After subscription is acquired, these
errors can be ignored.
- if (waitForSubscriptionLatch.getCount() >
0) {
- terminateSubscription(throwable);
- waitForSubscriptionLatch.countDown();
+ LOG.info(
+ "Activating subscription to shard {} with starting
position {} for consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn);
+
+ FanOutShardSubscriber subscriber = new FanOutShardSubscriber();
+ shardSubscriber = subscriber;
+
+ SubscribeToShardResponseHandler responseHandler =
+ SubscribeToShardResponseHandler.builder()
+ .subscriber(() -> subscriber)
+ .onError(
+ throwable -> {
+ synchronized (lockObject) {
+ if (!disposeIfActive(subscriber)) {
+ return;
+ }
+ }
+ LOG.error(
+ "Error onError subscribing to
shard {} with "
+ + "starting position
{} for consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn,
+ throwable);
+ setSubscriptionException(throwable);
+ })
+ .build();
+
+ cancelTimeoutFuture();
+ timeoutFuture =
+ TIMEOUT_SCHEDULER.schedule(
+ () -> {
+ String errorMessage =
+ "Timeout when subscribing to shard "
+ + shardId
+ + " with starting position "
+ + startingPosition
+ + " for consumer "
+ + consumerArn
+ + ".";
+ synchronized (lockObject) {
+ // The timeout future was cancelled
between firing and
+ // acquiring the lock (e.g. onSubscribe
succeeded, or another
+ // error path disposed the subscriber). Do
nothing.
+ if (timeoutFuture == null) {
+ return;
+ }
+ if (!disposeIfActive(subscriber)) {
+ return;
}
- })
- .build();
-
- // We don't need to keep track of the future here because we monitor
subscription success
- // using our own CountDownLatch
- kinesis.subscribeToShard(consumerArn, shardId, startingPosition,
responseHandler)
- .exceptionally(
- throwable -> {
- // If consumer exists and is still activating, we
want to countdown.
- if (ExceptionUtils.findThrowable(
- throwable,
ResourceInUseException.class)
- .isPresent()) {
- waitForSubscriptionLatch.countDown();
+ }
+ LOG.error(errorMessage);
+ setSubscriptionException(new
TimeoutException(errorMessage));
+ },
+ subscriptionTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ kinesis.subscribeToShard(consumerArn, shardId, startingPosition,
responseHandler)
+ .exceptionally(
+ throwable -> {
+ synchronized (lockObject) {
+ if (!disposeIfActive(subscriber)) {
+ return null;
+ }
+ }
+ LOG.error(
+ "Error exceptionally subscribing to
shard {} with starting position {} for "
+ + "consumer {}.",
+ shardId,
+ startingPosition,
+ consumerArn,
+ throwable);
+ setSubscriptionException(throwable);
return null;
- }
- LOG.error(
- "Error subscribing to shard {} with
starting position {} for consumer {}.",
- shardId,
- startingPosition,
- consumerArn,
- throwable);
- terminateSubscription(throwable);
- return null;
- });
-
- // We have to handle timeout for subscriptions separately because Java
8 does not support a
- // fluent orTimeout() methods on CompletableFuture.
- CompletableFuture.runAsync(
- () -> {
- try {
- if (waitForSubscriptionLatch.await(
- subscriptionTimeout.toMillis(),
TimeUnit.MILLISECONDS)) {
- LOG.info(
- "Successfully subscribed to shard {} with
starting position {} for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- // Request first batch of records.
- shardSubscriber.requestRecords();
-
- } else {
- String errorMessage =
- "Timeout when subscribing to shard "
- + shardId
- + " with starting position "
- + startingPosition
- + " for consumer "
- + consumerArn
- + ".";
- LOG.error(errorMessage);
- terminateSubscription(new
TimeoutException(errorMessage));
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for subscription
to complete.", e);
- terminateSubscription(e);
- Thread.currentThread().interrupt();
- }
- });
+ });
+ }
+ }
+
+ // Must be called while holding lockObject
+ private void cancelTimeoutFuture() {
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ timeoutFuture = null;
+ }
+ }
+
+ // Must be called while holding lockObject
+ private boolean disposeIfActive(FanOutShardSubscriber subscriber) {
+ if (shardSubscriber != subscriber) {
Review Comment:
do we need to pass in a subscriber and do this validation? Shouldn't there
be just the one?
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -274,61 +300,52 @@ public SubscribeToShardEvent nextEvent() {
* Streams.
*/
private class FanOutShardSubscriber implements
Subscriber<SubscribeToShardEventStream> {
- private final CountDownLatch subscriptionLatch;
- private Subscription subscription;
-
- private final AtomicReference<SubscriptionState> subscriptionState =
- new AtomicReference<>(SubscriptionState.NOT_STARTED);
- private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
-
- private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
- this.subscriptionLatch = subscriptionLatch;
- }
-
- /**
- * Fetch the state that the subscriber is in.
- *
- * @return Subscription state for the subscriber.
- */
- public SubscriptionState getSubscriptionState() {
- return subscriptionState.get();
- }
- /**
- * Boolean whether this subscriber has reached the end of a shard.
- *
- * @return True if ShardEnd. false otherwise.
- */
- public boolean isShardEndReached() {
- return isShardEnd.get();
- }
+ private Subscription subscription;
public void requestRecords() {
- subscription.request(1);
- }
-
- public void cancel() {
- if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
- LOG.warn("Trying to cancel inactive subscription. Ignoring.");
- return;
+ // subscription can be null if onSubscribe has not yet fired on a
freshly activated
+ // subscriber. In that case the initial request(1) will be issued
from onSubscribe
+ // itself, so it is safe to skip here.
+ if (subscription != null) {
+ subscription.request(1);
}
+ }
+ public void cancelSubscription() {
if (subscription != null) {
subscription.cancel();
}
- this.subscriptionState.set(SubscriptionState.COMPLETED);
}
@Override
public void onSubscribe(Subscription subscription) {
- LOG.info(
- "Successfully subscribed to shard {} at {} using consumer
{}.",
- shardId,
- startingPosition,
- consumerArn);
- this.subscription = subscription;
- this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
- subscriptionLatch.countDown();
+ synchronized (lockObject) {
+ if (shardSubscriber != this) {
+ // Timeout/error disposed this subscriber and a new one
was created before SDK
+ // called onSubscribe
+ subscription.cancel();
Review Comment:
Are we sure that the subscription passed in is not in use by the new
subscriber?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]