[ 
https://issues.apache.org/jira/browse/KAFKA-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Wang updated KAFKA-17139:
----------------------------
    Description: 
Recently we found the data mirroring of one of our partition stopped after got 
the following exception

 
{code:java}
[2024-07-05 13:36:07,058] WARN Failure during poll. 
(org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask)org.apache.kafka.common.protocol.types.SchemaException:
 Buffer underflow while parsing response for request with header 
RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-null-8, 
correlationId=-855959214)        at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:722)    
    at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:865)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)  
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)   
     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)   
     at 
org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask.poll(RheosHaMirrorSourceTask.java:130)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)        
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)        
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
       at java.lang.Thread.run(Thread.java:750)Caused by: 
java.nio.BufferUnderflowException        at 
java.nio.Buffer.nextGetIndex(Buffer.java:510)        at 
java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427)        at 
org.apache.kafka.common.protocol.ByteBufferAccessor.readLong(ByteBufferAccessor.java:48)
        at 
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.read(FetchResponseData.java:1928)
        at 
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.<init>(FetchResponseData.java:1904)
        at 
org.apache.kafka.common.message.FetchResponseData$PartitionData.read(FetchResponseData.java:881)
        at 
org.apache.kafka.common.message.FetchResponseData$PartitionData.<init>(FetchResponseData.java:805)
        at 
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.read(FetchResponseData.java:524)
        at 
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.<init>(FetchResponseData.java:464)
        at 
org.apache.kafka.common.message.FetchResponseData.read(FetchResponseData.java:199)
        at 
org.apache.kafka.common.message.FetchResponseData.<init>(FetchResponseData.java:136)
        at 
org.apache.kafka.common.requests.FetchResponse.parse(FetchResponse.java:119)    
    at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:117)
        at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:720)    
    ... 17 more {code}
The exception only thrown once, then the consumer stopped to fetrch from the 
node, the request rate to one of the Kafka broker dropped to 0  
!image-2024-07-15-15-35-12-489.png|width=601,height=233!

 

After going through the code of KafkaConsumer, every time KafkaConsumer tries 
to generate the fetch request to Kafka brokers, it will check if the target 
broker exists in {*}nodesWithPendingFetchRequests{*}. If it exists, then skip 
the target kafka broker in this round.

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L433]

The broker id can be removed only when the response completed.

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L600]

But in this case, the exception was thrown at *handleCompletedReceives,* which 
means the node id will never be removed from the 
*nodesWithPendingFetchRequests.*

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L594]

 
{code:java}
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutConnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses); {code}
So every time, when KafkaConnect source task try to call the  *poll* method of 
the MirrorSourceTask, the KafkaConsumer will skip fetch from the node id that 
left in the *nodesWithPendingFetchRequests.*

This will make the MirrorMaker tasks stop the data mirroring with only 1 WARN 
log.

  was:
Recently we found the data mirroring of one of our partition stopped after got 
the following exception

 
{code:java}
[2024-07-05 13:36:07,058] WARN Failure during poll. 
(org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask)org.apache.kafka.common.protocol.types.SchemaException:
 Buffer underflow while parsing response for request with header 
RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-null-8, 
correlationId=-855959214)        at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:722)    
    at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:865)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)  
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)   
     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)   
     at 
org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask.poll(RheosHaMirrorSourceTask.java:130)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)        
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)        
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)      
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
       at java.lang.Thread.run(Thread.java:750)Caused by: 
java.nio.BufferUnderflowException        at 
java.nio.Buffer.nextGetIndex(Buffer.java:510)        at 
java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427)        at 
org.apache.kafka.common.protocol.ByteBufferAccessor.readLong(ByteBufferAccessor.java:48)
        at 
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.read(FetchResponseData.java:1928)
        at 
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.<init>(FetchResponseData.java:1904)
        at 
org.apache.kafka.common.message.FetchResponseData$PartitionData.read(FetchResponseData.java:881)
        at 
org.apache.kafka.common.message.FetchResponseData$PartitionData.<init>(FetchResponseData.java:805)
        at 
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.read(FetchResponseData.java:524)
        at 
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.<init>(FetchResponseData.java:464)
        at 
org.apache.kafka.common.message.FetchResponseData.read(FetchResponseData.java:199)
        at 
org.apache.kafka.common.message.FetchResponseData.<init>(FetchResponseData.java:136)
        at 
org.apache.kafka.common.requests.FetchResponse.parse(FetchResponse.java:119)    
    at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:117)
        at 
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:720)    
    ... 17 more {code}
The exception only thrown once, then the consumer stopped to fetrch from the 
node, the request rate to one of the Kafka broker dropped to 0  
!image-2024-07-15-15-35-12-489.png|width=601,height=233!

 

After going through the code of KafkaConsumer, every time KafkaConsumer tries 
to generate the fetch request to Kafka brokers, it will check if the target 
broker exists in {*}nodesWithPendingFetchRequests{*}. If it exists, then skip 
the target kafka broker in this round. 

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L433]

The broker id can be removed only when the response completed.

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L600]

But in this case, the exception was thrown at *handleCompletedReceives,* which 
means the node id will never be removed from the 
*nodesWithPendingFetchRequests.* 

[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L594]

 
{code:java}
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutConnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses); {code}
So every time, when KafkaConnect source task try to call the  *poll* method of 
the MirrorSourceTask, the KafkaConsumer will skip fetch from the node id that 
left in the *nodesWithPendingFetchRequests.* 

This will make the MirrorMaker tasks stop the data mirroring with only 1 WARN 
log.

 

 

 


> MirrorSourceTask will stop mirroring when get BufferUnderflowException
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-17139
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17139
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect, mirrormaker
>    Affects Versions: 3.0.0, 3.5.2, 3.6.2, 3.7.1
>            Reporter: Yu Wang
>            Priority: Major
>         Attachments: image-2024-07-15-15-35-12-489.png
>
>
> Recently we found the data mirroring of one of our partition stopped after 
> got the following exception
>  
> {code:java}
> [2024-07-05 13:36:07,058] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask)org.apache.kafka.common.protocol.types.SchemaException:
>  Buffer underflow while parsing response for request with header 
> RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-null-8, 
> correlationId=-855959214)        at 
> org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:722)  
>       at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:865)
>         at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)        at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) 
>        at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
>        at 
> org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask.poll(RheosHaMirrorSourceTask.java:130)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)        
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)       
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)   
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:750)Caused by: 
> java.nio.BufferUnderflowException        at 
> java.nio.Buffer.nextGetIndex(Buffer.java:510)        at 
> java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427)        at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readLong(ByteBufferAccessor.java:48)
>         at 
> org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.read(FetchResponseData.java:1928)
>         at 
> org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.<init>(FetchResponseData.java:1904)
>         at 
> org.apache.kafka.common.message.FetchResponseData$PartitionData.read(FetchResponseData.java:881)
>         at 
> org.apache.kafka.common.message.FetchResponseData$PartitionData.<init>(FetchResponseData.java:805)
>         at 
> org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.read(FetchResponseData.java:524)
>         at 
> org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.<init>(FetchResponseData.java:464)
>         at 
> org.apache.kafka.common.message.FetchResponseData.read(FetchResponseData.java:199)
>         at 
> org.apache.kafka.common.message.FetchResponseData.<init>(FetchResponseData.java:136)
>         at 
> org.apache.kafka.common.requests.FetchResponse.parse(FetchResponse.java:119)  
>       at 
> org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:117)
>         at 
> org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:720)  
>       ... 17 more {code}
> The exception only thrown once, then the consumer stopped to fetrch from the 
> node, the request rate to one of the Kafka broker dropped to 0  
> !image-2024-07-15-15-35-12-489.png|width=601,height=233!
>  
> After going through the code of KafkaConsumer, every time KafkaConsumer tries 
> to generate the fetch request to Kafka brokers, it will check if the target 
> broker exists in {*}nodesWithPendingFetchRequests{*}. If it exists, then skip 
> the target kafka broker in this round.
> [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L433]
> The broker id can be removed only when the response completed.
> [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L600]
> But in this case, the exception was thrown at *handleCompletedReceives,* 
> which means the node id will never be removed from the 
> *nodesWithPendingFetchRequests.*
> [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L594]
>  
> {code:java}
>         handleCompletedSends(responses, updatedNow);
>         handleCompletedReceives(responses, updatedNow);
>         handleDisconnections(responses, updatedNow);
>         handleConnections();
>         handleInitiateApiVersionRequests(updatedNow);
>         handleTimedOutConnections(responses, updatedNow);
>         handleTimedOutRequests(responses, updatedNow);
>         completeResponses(responses); {code}
> So every time, when KafkaConnect source task try to call the  *poll* method 
> of the MirrorSourceTask, the KafkaConsumer will skip fetch from the node id 
> that left in the *nodesWithPendingFetchRequests.*
> This will make the MirrorMaker tasks stop the data mirroring with only 1 WARN 
> log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to