pelaezryan commented on code in PR #245:
URL:
https://github.com/apache/flink-connector-aws/pull/245#discussion_r3220614569
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -254,26 +275,48 @@ public SubscribeToShardEvent nextEvent() {
*/
private class FanOutShardSubscriber implements
Subscriber<SubscribeToShardEventStream> {
private final CountDownLatch subscriptionLatch;
-
private Subscription subscription;
+ private final AtomicReference<SubscriptionState> subscriptionState =
+ new AtomicReference<>(SubscriptionState.NOT_STARTED);
+ private final AtomicBoolean isShardEnd = new AtomicBoolean(false);
+
private FanOutShardSubscriber(CountDownLatch subscriptionLatch) {
this.subscriptionLatch = subscriptionLatch;
}
+ /**
+ * Fetch the state that the subscriber is in.
+ *
+ * @return Subscription state for the subscriber.
+ */
+ public SubscriptionState getSubscriptionState() {
+ return subscriptionState.get();
+ }
+
+ /**
+ * Boolean whether this subscriber has reached the end of a shard.
+ *
+ * @return True if ShardEnd. false otherwise.
+ */
+ public boolean isShardEndReached() {
+ return isShardEnd.get();
+ }
+
public void requestRecords() {
subscription.request(1);
}
public void cancel() {
- if (!subscriptionActive.get()) {
+ if (this.subscriptionState.get() == SubscriptionState.COMPLETED) {
Review Comment:
The log statement is technically correct in that its canceling a
subscription that is no longer active. But ill update the wording to more
closely align with the new states
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]