pelaezryan commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3229586133
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -70,22 +70,37 @@ public class FanOutKinesisShardSubscription {
TimeoutException.class,
IOException.class,
LimitExceededException.class);
+ private static final ScheduledExecutorService TIMEOUT_SCHEDULER =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread t = new Thread(r,
"subscription-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
private final AsyncStreamProxy kinesis;
private final String consumerArn;
private final String shardId;
-
private final Duration subscriptionTimeout;
- // Queue is meant for eager retrieval of records from the Kinesis stream.
We will always have 2
- // record batches available on next read.
- private final BlockingQueue<SubscribeToShardEvent> eventQueue = new
LinkedBlockingQueue<>(2);
+ /**
+ * Number of events to keep in flight per subscriber. Pipelining the fetch
overlaps the
+ * server's next-event work with the consumer's drain work. Must match the
capacity of
+ * {@link #eventQueue}.
+ */
+ private static final int PREFETCH = 2;
+
+ private final BlockingQueue<SubscribeToShardEvent> eventQueue = new
LinkedBlockingQueue<>(PREFETCH);
private final AtomicReference<Throwable> subscriptionException = new
AtomicReference<>();
- // Store the current starting position for this subscription. Will be
updated each time new
- // batch of records is consumed
- private StartingPosition startingPosition;
+ // All fields below are guarded by lockObject
Review Comment:
nit: I could see this comment diverging easily with future changes. New
variables are likely to be added below this line. An alternative way to relate
the variables to the lock is to prefix/suffix the variable names to include
some indicator. (i.e. ``guardedTimeoutFuture``)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]