Juliusz Nadberezny created FLINK-38814:
------------------------------------------

             Summary: Kinesis Source in EFO - idle consumer on subscription 
failure
                 Key: FLINK-38814
                 URL: https://issues.apache.org/jira/browse/FLINK-38814
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: aws-connector-5.0.0
            Reporter: Juliusz Nadberezny


The {{FanOutKinesisShardSubscription}} can enter a zombie state where the 
connector appears active but consumes no data. This occurs when a subscription 
attempt fails (due to {{{}ResourceInUseException{}}}), triggering a silent 
{{NPE}} in the background activation thread.



The issue lies in the {{.exceptionally(...)}} block inside 
{{{}activateSubscription(){}}}.

When Kinesis throws a {{ResourceInUseException}} (meaning another consumer 
holds the shard), the code explicitly tells the {{waitForSubscriptionLatch}} to 
release. The waiting thread wakes up, {*}assuming the subscription was 
successful{*}, and attempts to use it.

*The code path to failure:*
1. Trigger
{{kinesis.subscribeToShard}} fails with {{ResourceInUseException}}

2. The Trap
The error handler catches it
{code:java}
if (ExceptionUtils.findThrowable(throwable, 
ResourceInUseException.class).isPresent()) {
    waitForSubscriptionLatch.countDown(); // <--- TRAP: Signals "Success" to 
the waiter
    return null;
} {code}


3. The Crash
The background thread (inside {{{}CompletableFuture.runAsync{}}}) wakes up 
because the latch opened. It proceeds to the "success" block:
{code:java}
if (waitForSubscriptionLatch.await(...)) {
    subscriptionActive.set(true); // 1. Marks connector as ACTIVE
    shardSubscriber.requestRecords(); // 2. Calls method on subscriber
} {code}

4. The NPE
Inside {{{}shardSubscriber.requestRecords(){}}}:
{code:java}
public void requestRecords() {
    subscription.request(1); // <--- CRASH: 'subscription' is NULL
} {code}


How to observe it in logs:
{code:java}
if (ExceptionUtils.findThrowable(
                throwable, ResourceInUseException.class)
        .isPresent()) {
    LOG.warn("DEBUG: ResourceInUseException caught. Counting down latch 
(Original Logic triggers here).");
    waitForSubscriptionLatch.countDown();
    return null;
} {code}
{code:java}
try {   shardSubscriber.requestRecords();   LOG.info("DEBUG: [AsyncThread] 
requestRecords() success.");} catch (NullPointerException e) {   
LOG.error("DEBUG: [AsyncThread] CRITICAL: Caught NPE in requestRecords! 
Subscription object is likely missing.", e);   terminateSubscription(e);}
 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to