pelaezryan commented on code in PR #247:
URL: 
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3229586133


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -70,22 +70,37 @@ public class FanOutKinesisShardSubscription {
                     TimeoutException.class,
                     IOException.class,
                     LimitExceededException.class);
+    private static final ScheduledExecutorService TIMEOUT_SCHEDULER =
+            new ScheduledThreadPoolExecutor(
+                    1,
+                    r -> {
+                        Thread t = new Thread(r, 
"subscription-timeout-scheduler");
+                        t.setDaemon(true);
+                        return t;
+                    });
 
     private final AsyncStreamProxy kinesis;
     private final String consumerArn;
     private final String shardId;
-
     private final Duration subscriptionTimeout;
 
-    // Queue is meant for eager retrieval of records from the Kinesis stream. 
We will always have 2
-    // record batches available on next read.
-    private final BlockingQueue<SubscribeToShardEvent> eventQueue = new 
LinkedBlockingQueue<>(2);
+    /**
+     * Number of events to keep in flight per subscriber. Pipelining the fetch 
overlaps the
+     * server's next-event work with the consumer's drain work. Must match the 
capacity of
+     * {@link #eventQueue}.
+     */
+    private static final int PREFETCH = 2;
+
+    private final BlockingQueue<SubscribeToShardEvent> eventQueue = new 
LinkedBlockingQueue<>(PREFETCH);
     private final AtomicReference<Throwable> subscriptionException = new 
AtomicReference<>();
 
-    // Store the current starting position for this subscription. Will be 
updated each time new
-    // batch of records is consumed
-    private StartingPosition startingPosition;
+    // All fields below are guarded by lockObject

Review Comment:
   nit: I could see this comment diverging easily with future changes. New 
variables are likely to be added below this line. An alternative way to relate 
the variables to the lock is to prefix/suffix the variable names to include 
some indicator. (i.e. ``guardedTimeoutFuture``)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to