gguptp commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3265010017
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -102,100 +118,130 @@ public FanOutKinesisShardSubscription(
/** Method to allow eager activation of the subscription. */
public void activateSubscription() {
- LOG.info(
- "Activating subscription to shard {} with starting position {}
for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- if (shardSubscriber != null
- && shardSubscriber.getSubscriptionState() ==
SubscriptionState.SUBSCRIBED) {
- LOG.warn("Skipping activation of subscription since it is already
active.");
- return;
- }
+ synchronized (lockObject) {
Review Comment:
i am wondering if we can make this lock free.
`activateSubscription` is called by `FanoutKinesisShardSplitReader` which is
single threaded and holds all the subscriptions. I'm also not sure that if the
schedule function of the `TIMEOUT_SCHEDULER` will work as we are expecting or
not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]