leekeiabstraction commented on code in PR #208:
URL: 
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2140249231


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########
@@ -19,43 +19,242 @@
 package org.apache.flink.connector.kinesis.source.reader.fanout;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
 import 
org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
 
 /**
  * An implementation of the KinesisShardSplitReader that consumes from Kinesis 
using Enhanced
  * Fan-Out and HTTP/2.
  */
 @Internal
 public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class);
     private final AsyncStreamProxy asyncStreamProxy;
     private final String consumerArn;
     private final Duration subscriptionTimeout;
+    private final Duration deregisterTimeout;
+
+    /**
+     * Shared executor service for all shard subscriptions.
+     *
+     * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) 
but this does not pose
+     * a risk of out-of-memory errors due to the natural flow control 
mechanisms in the system:
+     *
+     * <ol>
+     *   <li>Each {@link FanOutKinesisShardSubscription} has a bounded event 
queue with capacity of 2</li>
+     *   <li>New records are only requested after processing an event (via 
{@code requestRecords()})</li>
+     *   <li>When a shard's queue is full, the processing thread blocks at the 
{@code put()} operation</li>

Review Comment:
   I think I'm missing context to interpret how the backpressure works. 
   
   Can you clarify if put() and requestRecords() is done in order and 
requestRecords() is done only after put()? If so, that was the information that 
I was missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to