[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677148#comment-16677148
 ] 

Jason Gustafson commented on KAFKA-7531:
----------------------------------------

[~spuzon] Thanks for the extra info. Yeah, let us know if can reproduce it. 
Just looking at the trace, it's tough to tell what is causing the NPE. Which 
timeout are you referring to specifically? 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-7531
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7531
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 2.0.0
>            Reporter: Sebastian Puzoń
>            Priority: Critical
>             Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122
>  in group elog_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator){code}
>  
>  
> On the application side I can see such stack trace:
>  
>  
> {code:java}
> 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 
> due to the following error:
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The server experienced an unexpected error when processing the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to 
> PENDING_SHUTDOWN
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
> 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer 
> clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, 
> transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis 
> = 9223372036854775807 ms.
> 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 
> 0_9 due to the following error:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Caused by: org.apache.kafka.common.KafkaException: Unhandled error in 
> EndTxnResponse: The server experienced an unexpected error when processing 
> the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> {code}
> This way all streams application threads are being shutdown.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to