chenylee-aws commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3268212305
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -102,100 +118,130 @@ public FanOutKinesisShardSubscription(
/** Method to allow eager activation of the subscription. */
public void activateSubscription() {
- LOG.info(
- "Activating subscription to shard {} with starting position {}
for consumer {}.",
- shardId,
- startingPosition,
- consumerArn);
- if (shardSubscriber != null
- && shardSubscriber.getSubscriptionState() ==
SubscriptionState.SUBSCRIBED) {
- LOG.warn("Skipping activation of subscription since it is already
active.");
- return;
- }
+ synchronized (lockObject) {
Review Comment:
The `handleSplitChange` method always starts a new
`FanoutKinesisShardSubscription` so there would be no active subscription going
on this lock is pretty much a no-op. Besides that, there is no long blocking
operation performed under any of the lock blocks so really not expecting any
contention.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]