aws-nageshvh commented on code in PR #208:
URL:
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2098924842
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########
@@ -45,17 +52,128 @@ public class FanOutKinesisShardSplitReader extends
KinesisShardSplitReaderBase {
private final String consumerArn;
private final Duration subscriptionTimeout;
+ /**
+ * Shared executor service for all shard subscriptions.
+ *
+ * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue})
to ensure no tasks are ever rejected.
+ * Although the queue is technically unbounded, the system has natural
flow control mechanisms that effectively
+ * bound the queue size:
+ *
+ * <ol>
+ * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event
queue with capacity of 2</li>
+ * <li>New records are only requested after processing an event (via
{@code requestRecords()})</li>
+ * <li>The maximum number of queued tasks is effectively bounded by
{@code 2 * number_of_shards}</li>
+ * </ol>
+ *
+ * <p>This design provides natural backpressure while ensuring no records
are dropped, making it safe
+ * to use an unbounded executor queue.
+ */
+ private final ExecutorService sharedShardSubscriptionExecutor;
+
private final Map<String, FanOutKinesisShardSubscription>
splitSubscriptions = new HashMap<>();
+ /**
+ * Factory for creating subscriptions. This is primarily used for testing.
+ */
+ @VisibleForTesting
+ public interface SubscriptionFactory {
+ FanOutKinesisShardSubscription createSubscription(
+ AsyncStreamProxy proxy,
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ Duration timeout,
+ ExecutorService executor);
+ }
+
+ /**
+ * Default implementation of the subscription factory.
+ */
+ private static class DefaultSubscriptionFactory implements
SubscriptionFactory {
+ @Override
+ public FanOutKinesisShardSubscription createSubscription(
+ AsyncStreamProxy proxy,
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ Duration timeout,
+ ExecutorService executor) {
+ return new FanOutKinesisShardSubscription(
+ proxy,
+ consumerArn,
+ shardId,
+ startingPosition,
+ timeout,
+ executor);
+ }
+ }
+
+ private SubscriptionFactory subscriptionFactory;
+
public FanOutKinesisShardSplitReader(
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
+ this(asyncStreamProxy, consumerArn, shardMetricGroupMap,
configuration, new DefaultSubscriptionFactory());
+ }
+
+ @VisibleForTesting
+ FanOutKinesisShardSplitReader(
+ AsyncStreamProxy asyncStreamProxy,
+ String consumerArn,
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
+ Configuration configuration,
+ SubscriptionFactory subscriptionFactory) {
+ this(
+ asyncStreamProxy,
+ consumerArn,
+ shardMetricGroupMap,
+ configuration,
+ subscriptionFactory,
+ createDefaultExecutor());
+ }
+
+ /**
+ * Constructor with injected executor service for testing.
+ *
+ * @param asyncStreamProxy The proxy for Kinesis API calls
+ * @param consumerArn The ARN of the consumer
+ * @param shardMetricGroupMap The metrics map
+ * @param configuration The configuration
+ * @param subscriptionFactory The factory for creating subscriptions
+ * @param executorService The executor service to use for subscription
tasks
+ */
+ @VisibleForTesting
+ FanOutKinesisShardSplitReader(
+ AsyncStreamProxy asyncStreamProxy,
+ String consumerArn,
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
+ Configuration configuration,
+ SubscriptionFactory subscriptionFactory,
+ ExecutorService executorService) {
super(shardMetricGroupMap, configuration);
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
this.subscriptionTimeout =
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
+ this.subscriptionFactory = subscriptionFactory;
+ this.sharedShardSubscriptionExecutor = executorService;
+ }
+
+ /**
+ * Creates the default executor service for subscription tasks.
+ *
+ * @return A new executor service
+ */
+ private static ExecutorService createDefaultExecutor() {
+ int minThreads = Runtime.getRuntime().availableProcessors();
+ int maxThreads = minThreads * 2;
+ return new ThreadPoolExecutor(
+ minThreads,
+ maxThreads,
+ 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), // Unbounded queue with natural flow
control
Review Comment:
Yes, this is added in the documentation too which I further refined it in
the next iteration by adding some more details,
Despite using an unbounded queue in the executor, we will highly likely not
run into OOM problems because:
1. The number of threads is limited (2x the number of cores)
2. Each shard has a bounded queue with capacity 2
3. We only request more records after successfully processing and storing an
event
4. If a shard's queue is full, the processing blocks, creating back-pressure
5. This effectively bounds the total number of events in memory to `2 *
number_of_shards`
In the worst-case scenario during backpressure, the maximum number of events
in memory is Max Events = (2 * Number_of_Shards) + min(Number_of_Shards,
Number_of_Threads).
I have been running a few tests to test these scenarios,
a) 10 shards to one JVM with 4GB mem and running from TRIM for 10 days of
backfill. The app catches up without a job restart/OOM
b) 1 shards to one JVM with 4GB mem and running from TRIM for 10 days of
backfill. The app catches up without a job restart/OOM
Both the apps are writing to a 1 shard destination stream which makes it
such that apps are constantly in backpressure till they catch up to tip
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]