leekeiabstraction commented on code in PR #208:
URL:
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2149589839
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########
@@ -19,43 +19,242 @@
package org.apache.flink.connector.kinesis.source.reader.fanout;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import
org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
+import static
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
/**
* An implementation of the KinesisShardSplitReader that consumes from Kinesis
using Enhanced
* Fan-Out and HTTP/2.
*/
@Internal
public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class);
private final AsyncStreamProxy asyncStreamProxy;
private final String consumerArn;
private final Duration subscriptionTimeout;
+ private final Duration deregisterTimeout;
+
+ /**
+ * Shared executor service for all shard subscriptions.
+ *
+ * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue})
but this does not pose
+ * a risk of out-of-memory errors due to the natural flow control
mechanisms in the system:
+ *
+ * <ol>
+ * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event
queue with capacity of 2</li>
+ * <li>New records are only requested after processing an event (via
{@code requestRecords()})</li>
+ * <li>When a shard's queue is full, the processing thread blocks at the
{@code put()} operation</li>
+ * <li>The AWS SDK implements the Reactive Streams protocol with
built-in backpressure</li>
+ * </ol>
+ *
+ * <p>In the worst-case scenario during backpressure, the maximum number
of events in memory is:
+ * <pre>
+ * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards,
Number_of_Threads)
+ * </pre>
+ *
+ * <p>Where:
+ * <ul>
+ * <li>2 * Number_of_Shards: Total capacity of all shard event
queues</li>
+ * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being
actively processed</li>
+ * </ul>
+ *
+ * <p>This ensures that memory usage scales linearly with the number of
shards, not exponentially,
+ * making it safe to use an unbounded executor queue even with a large
number of shards.
+ */
+ private final ExecutorService sharedShardSubscriptionExecutor;
+
+ /**
+ * Shared executor service for making subscribeToShard API calls.
+ *
+ * <p>This executor is separate from the event processing executor to
avoid contention
+ * between API calls and event processing. Using a dedicated executor for
subscription calls
+ * provides several important benefits:
+ *
+ * <ol>
+ * <li>Prevents blocking of the main thread or event processing threads
during API calls</li>
+ * <li>Isolates API call failures from event processing operations</li>
+ * <li>Allows for controlled concurrency of API calls across multiple
shards</li>
+ * <li>Prevents potential deadlocks that could occur when the same
thread handles both
+ * subscription calls and event processing</li>
+ * </ol>
+ *
+ * <p>The executor uses a smaller number of threads than the event
processing executor since
+ * subscription calls are less frequent and primarily I/O bound. This
helps optimize resource
+ * usage while still providing sufficient parallelism for multiple
concurrent subscription calls.
+ */
+ private final ExecutorService sharedSubscriptionCallExecutor;
private final Map<String, FanOutKinesisShardSubscription>
splitSubscriptions = new HashMap<>();
+ /**
+ * Factory for creating subscriptions. This is primarily used for testing.
+ */
+ @VisibleForTesting
+ public interface SubscriptionFactory {
+ FanOutKinesisShardSubscription createSubscription(
+ AsyncStreamProxy proxy,
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ Duration timeout,
+ ExecutorService eventProcessingExecutor,
+ ExecutorService subscriptionCallExecutor);
+ }
+
+ /**
+ * Default implementation of the subscription factory.
+ */
+ private static class DefaultSubscriptionFactory implements
SubscriptionFactory {
+ @Override
+ public FanOutKinesisShardSubscription createSubscription(
+ AsyncStreamProxy proxy,
+ String consumerArn,
+ String shardId,
+ StartingPosition startingPosition,
+ Duration timeout,
+ ExecutorService eventProcessingExecutor,
+ ExecutorService subscriptionCallExecutor) {
+ return new FanOutKinesisShardSubscription(
+ proxy,
+ consumerArn,
+ shardId,
+ startingPosition,
+ timeout,
+ eventProcessingExecutor,
+ subscriptionCallExecutor);
+ }
+ }
+
+ private SubscriptionFactory subscriptionFactory;
+
public FanOutKinesisShardSplitReader(
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
+ this(asyncStreamProxy, consumerArn, shardMetricGroupMap,
configuration, new DefaultSubscriptionFactory());
+ }
+
+ @VisibleForTesting
+ FanOutKinesisShardSplitReader(
+ AsyncStreamProxy asyncStreamProxy,
+ String consumerArn,
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
+ Configuration configuration,
+ SubscriptionFactory subscriptionFactory) {
+ this(
+ asyncStreamProxy,
+ consumerArn,
+ shardMetricGroupMap,
+ configuration,
+ subscriptionFactory,
+ createDefaultEventProcessingExecutor(),
+ createDefaultSubscriptionCallExecutor());
+ }
+
+ /**
+ * Constructor with injected executor services for testing.
+ *
+ * @param asyncStreamProxy The proxy for Kinesis API calls
+ * @param consumerArn The ARN of the consumer
+ * @param shardMetricGroupMap The metrics map
+ * @param configuration The configuration
+ * @param subscriptionFactory The factory for creating subscriptions
+ * @param eventProcessingExecutor The executor service to use for event
processing tasks
+ * @param subscriptionCallExecutor The executor service to use for
subscription API calls
+ */
+ @VisibleForTesting
+ FanOutKinesisShardSplitReader(
+ AsyncStreamProxy asyncStreamProxy,
+ String consumerArn,
+ Map<String, KinesisShardMetrics> shardMetricGroupMap,
+ Configuration configuration,
+ SubscriptionFactory subscriptionFactory,
+ ExecutorService eventProcessingExecutor,
+ ExecutorService subscriptionCallExecutor) {
super(shardMetricGroupMap, configuration);
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
this.subscriptionTimeout =
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
+ this.deregisterTimeout =
configuration.get(EFO_DEREGISTER_CONSUMER_TIMEOUT);
+ this.subscriptionFactory = subscriptionFactory;
+ this.sharedShardSubscriptionExecutor = eventProcessingExecutor;
+ this.sharedSubscriptionCallExecutor = subscriptionCallExecutor;
Review Comment:
Been thinking about this approach where `FanOutKinesisShardSplitReader`
instantiates the ExecutorServices and passes them to
`FanOutKinesisShardSubscription`. I have the following questions:
1. Would a backpressured shard drown out other shards? Specifically, if
there is a shard that has large backlog and is severely backpressured by
downstream, we would expect to see that the
`subscriptionEventProcessingExecutor`'s input queue have a large number of
callables queued for the backpressured shard. This would mean that the quieter
shards' callables will be backpressured as well as they will be place in the
same queue. This was not the case in current design in that a shard is never
backpressured by its neighbours' downstream. This may cause something akin to
data loss in the case where idle watermarking strategy would drop the now late
arriving records from non-backpressured, quieter shards.
2. Are we breaking abstraction here by defining and passing around executor
services?
------
I think instead of letting SplitReader instantiate ExecutorService, an
alternative approach that we can consider is
1. FanOutKinesisShardSubscription instantiate executor service e.g.
`Executors.newSingleThreadExecutor()` and
2. FanOutKinesisShardSubscription manage the lifecycle of the executor
service.
3. The executor service can be used for both handling of subscription event
and also re-subscribing.
This will ensure that single backpressured shard will not impact throughput
of other non-backpressured shard and also use of ExecutorService is fully
encapsulated within FanOutKinesisShardSubscription. There is a change in the
scaling here as we'd scale threads with 1 x number of shards instead of 1 to 2x
number of processors, however I do not see big drawback here as we usually
match or recommend number of cores(KPU) to number of shards.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]