[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673706#comment-16673706
 ] 

Sebastian Puzoń commented on KAFKA-7531:
----------------------------------------

Hi Jason,

here's commit id from broker: _INFO Kafka commitId : 3402a8361b734732 
(org.apache.kafka.common.utils.AppInfoParser)_

FYI, the NPE issue may be related to session timeout discrepancy between 
broker/client, On the broker I've had 20s whereas on the client side it was 
60s. I'm pretty sure this was also a reason of Streams thread leak on the 
application side that I could also observe, it was slow but eventually all 
streams threads in JVM have been shut down. I'm guessing ... maybe this leads 
to NPE issue describe in this ticket?

Since I corrected session settings for broker/stream app I don't observe stream 
thread leak in client app, I didn't have time to re-test yet, I got NPE after 
few hours, I will try next week.

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-7531
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7531
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 2.0.0
>            Reporter: Sebastian Puzoń
>            Priority: Critical
>             Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122
>  in group elog_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator){code}
>  
>  
> On the application side I can see such stack trace:
>  
>  
> {code:java}
> 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 
> due to the following error:
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The server experienced an unexpected error when processing the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to 
> PENDING_SHUTDOWN
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
> 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer 
> clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, 
> transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis 
> = 9223372036854775807 ms.
> 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 
> 0_9 due to the following error:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Caused by: org.apache.kafka.common.KafkaException: Unhandled error in 
> EndTxnResponse: The server experienced an unexpected error when processing 
> the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> {code}
> This way all streams application threads are being shutdown.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to