chenylee-aws opened a new pull request, #247: URL: https://github.com/apache/flink-connector-aws/pull/247
…ng behavior and concurrent activation race in FanOutKinesisShardSubscription ## Purpose of the change Fix FLINK-39660 (https://issues.apache.org/jira/browse/FLINK-39660): a cascading failure in the Kinesis EFO subscription lifecycle. Under transient disruptions (downstream slow consumer, GC pause, brief network error) the connector enters a self-sustaining retry storm that stalls consumption across many shards, leaks HTTP/2 streams, and saturates the JVM common ForkJoinPool. See the JIRA for observed symptoms and an illustrative stack trace. This PR addresses the root causes in FanOutKinesisShardSubscription: - Netty event loop no longer blocks on record processing. Switched from blocking eventQueue.put() to non-blocking offer(), with a prefetch-based pull model (priming PREFETCH - queueSize requests in onSubscribe, issuing request(1) after each consumer drain). The invariant queue.size + outstandingRequests == PREFETCH prevents overflow by construction. - Concurrent activations on the same shard are prevented. Replaced the subscriptionActive-based guard (which only blocked after onSubscribe) with an identity-based guard on shardSubscriber. Activations during the pending window are now correctly rejected. - Identity-based disposal. All four error paths (responseHandler.onError, .exceptionally, timeout callback, Subscriber.onError) funnel through disposeIfActive(subscriber); only the winning path cleans up. Losing paths no-op instead of corrupting state. - cancel() now runs unconditionally. Removed the subscriptionActive gate that made cancel a no-op during the pending window; failed subscriptions now promptly release their HTTP/2 stream slot. - Timeout watcher moved off the common ForkJoinPool onto a dedicated single-thread ScheduledExecutorService; common pool is no longer saturated by latch watchers. - Timeout/onSubscribe race closed. Added a timeoutFuture == null guard inside the timeout callback so a late-firing timeout cannot tear down a subscription that has just successfully established. - Stale onNext deliveries dropped via identity check. If the AWS SDK delivers an event on a disposed subscriber, it is silently ignored (the server will redeliver it on the replacement subscription). - Added close() invoked from FanOutKinesisShardSplitReader.close() for clean shutdown; pending subscriptions are disposed and further activations are rejected. - KinesisStreamsSource now passes user-configured properties to the Netty HTTP client instead of an empty AttributeMap, so http-client.max-concurrency and related settings actually apply. ## Verifying this change This change added tests and can be verified as follows: - Added 22 unit tests in FanOutKinesisShardSubscriptionTest covering: - Happy-path activation, EFO 5-minute rotation, shard-end handling - Concurrent activation prevention (the main correctness bug) - Recoverable / unrecoverable / ResourceNotFoundException error paths - Dual error-path deduplication via identity check - Subscription timeout cleanup and retry - Stale onSubscribe / stale onNext handling after disposal - close() behavior and idempotency - Pull-based backpressure (initial request equals PREFETCH, request-per-drain, no spurious requests on empty queue) - Reactivation invariant (priming equals PREFETCH - queueSize, not always PREFETCH) - End-to-end pipeline depth check (queue.size + outstanding == PREFETCH throughout a full drain cycle) This change added tests and can be verified as follows: - Tests use a scripted AsyncStreamProxy fake that lets each test deterministically drive onSubscribe, onNext, onComplete, handler.onError, and future.completeExceptionally signals. Avoids Thread.sleep in favour of Awaitility polling. - Load-tested against a 200-shard stream at parallelism 18 for ~90 minutes before and after the change. With the fix: no concurrent activations observed, no zombie subscriptions, no ForkJoinPool common-pool saturation in thread dumps, no Netty event-loop blocking on put. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
