Juliusz Nadberezny created FLINK-38814:
------------------------------------------
Summary: Kinesis Source in EFO - idle consumer on subscription
failure
Key: FLINK-38814
URL: https://issues.apache.org/jira/browse/FLINK-38814
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: aws-connector-5.0.0
Reporter: Juliusz Nadberezny
The {{FanOutKinesisShardSubscription}} can enter a zombie state where the
connector appears active but consumes no data. This occurs when a subscription
attempt fails (due to {{{}ResourceInUseException{}}}), triggering a silent
{{NPE}} in the background activation thread.
The issue lies in the {{.exceptionally(...)}} block inside
{{{}activateSubscription(){}}}.
When Kinesis throws a {{ResourceInUseException}} (meaning another consumer
holds the shard), the code explicitly tells the {{waitForSubscriptionLatch}} to
release. The waiting thread wakes up, {*}assuming the subscription was
successful{*}, and attempts to use it.
*The code path to failure:*
1. Trigger
{{kinesis.subscribeToShard}} fails with {{ResourceInUseException}}
2. The Trap
The error handler catches it
{code:java}
if (ExceptionUtils.findThrowable(throwable,
ResourceInUseException.class).isPresent()) {
waitForSubscriptionLatch.countDown(); // <--- TRAP: Signals "Success" to
the waiter
return null;
} {code}
3. The Crash
The background thread (inside {{{}CompletableFuture.runAsync{}}}) wakes up
because the latch opened. It proceeds to the "success" block:
{code:java}
if (waitForSubscriptionLatch.await(...)) {
subscriptionActive.set(true); // 1. Marks connector as ACTIVE
shardSubscriber.requestRecords(); // 2. Calls method on subscriber
} {code}
4. The NPE
Inside {{{}shardSubscriber.requestRecords(){}}}:
{code:java}
public void requestRecords() {
subscription.request(1); // <--- CRASH: 'subscription' is NULL
} {code}
How to observe it in logs:
{code:java}
if (ExceptionUtils.findThrowable(
throwable, ResourceInUseException.class)
.isPresent()) {
LOG.warn("DEBUG: ResourceInUseException caught. Counting down latch
(Original Logic triggers here).");
waitForSubscriptionLatch.countDown();
return null;
} {code}
{code:java}
try { shardSubscriber.requestRecords(); LOG.info("DEBUG: [AsyncThread]
requestRecords() success.");} catch (NullPointerException e) {
LOG.error("DEBUG: [AsyncThread] CRITICAL: Caught NPE in requestRecords!
Subscription object is likely missing.", e); terminateSubscription(e);}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)