[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690155#comment-16690155
 ] 

Guozhang Wang commented on KAFKA-7531:
--------------------------------------

[~spuzon] I cannot recall there is a config named `session.timeout.ms` on the 
broker side, there are only

`group.min.session.timeout.ms` and `group.max.session.timeout.ms`, and the 
other is `zookeeper.session.timeout.ms` which should be irrelevant.

I looked at the source code carefully around the place you pointed out 
(TransactionCoordinator.scala:393 - 398) but cannot find any obvious link, if 
you can edit the code (seem you can since you are running out of source code) 
could you add some log entry around this piece and see which object actually 
throws the NPE?

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-7531
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7531
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 2.0.0
>            Reporter: Sebastian Puzoń
>            Priority: Critical
>             Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122
>  in group elog_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator){code}
>  
>  
> On the application side I can see such stack trace:
>  
>  
> {code:java}
> 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 
> due to the following error:
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The server experienced an unexpected error when processing the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to 
> PENDING_SHUTDOWN
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
> 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer 
> clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, 
> transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis 
> = 9223372036854775807 ms.
> 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread 
> [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 
> 0_9 due to the following error:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Caused by: org.apache.kafka.common.KafkaException: Unhandled error in 
> EndTxnResponse: The server experienced an unexpected error when processing 
> the request
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> {code}
> This way all streams application threads are being shutdown.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to