chenylee-aws commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3239593289
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -274,61 +300,52 @@ public SubscribeToShardEvent nextEvent() {
* Streams.
*/
private class FanOutShardSubscriber implements
Subscriber<SubscribeToShardEventStream> {
- private final CountDownLatch subscriptionLatch;
- private Subscription subscription;
-
- private final AtomicReference<SubscriptionState> subscriptionState =
- new AtomicReference<>(SubscriptionState.NOT_STARTED);
- private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
-
- private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
- this.subscriptionLatch = subscriptionLatch;
- }
-
- /**
- * Fetch the state that the subscriber is in.
- *
- * @return Subscription state for the subscriber.
- */
- public SubscriptionState getSubscriptionState() {
- return subscriptionState.get();
- }
- /**
- * Boolean whether this subscriber has reached the end of a shard.
- *
- * @return True if ShardEnd. false otherwise.
- */
- public boolean isShardEndReached() {
- return isShardEnd.get();
- }
+ private Subscription subscription;
public void requestRecords() {
- subscription.request(1);
- }
-
- public void cancel() {
- if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
- LOG.warn("Trying to cancel inactive subscription. Ignoring.");
- return;
+ // subscription can be null if onSubscribe has not yet fired on a
freshly activated
+ // subscriber. In that case the initial request(1) will be issued
from onSubscribe
+ // itself, so it is safe to skip here.
+ if (subscription != null) {
+ subscription.request(1);
}
+ }
+ public void cancelSubscription() {
if (subscription != null) {
subscription.cancel();
}
- this.subscriptionState.set(SubscriptionState.COMPLETED);
}
@Override
public void onSubscribe(Subscription subscription) {
- LOG.info(
- "Successfully subscribed to shard {} at {} using consumer
{}.",
- shardId,
- startingPosition,
- consumerArn);
- this.subscription = subscription;
- this.subscriptionState.set(SubscriptionState.SUBSCRIBED);
- subscriptionLatch.countDown();
+ synchronized (lockObject) {
+ if (shardSubscriber != this) {
+ // Timeout/error disposed this subscriber and a new one
was created before SDK
+ // called onSubscribe
+ subscription.cancel();
Review Comment:
Yes. The `Subscription` parameter is the reactive-streams handle for this
specific `subscribeToShard` call. It's bound one-to-one with this
`FanOutShardSubscriber` instance (the SDK creates a fresh one per call and
invokes `onSubscribe` with it exactly once, per Reactive Streams Rule 2.12).
The replacement subscriber has its own `subscribeToShard` call producing its
own distinct `Subscription`. Cancelling this stale one releases this particular
HTTP/2 stream slot without affecting the new subscriber's stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]