[ https://issues.apache.org/jira/browse/KAFKA-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969309#comment-16969309 ]
Mickael Maison commented on KAFKA-8933: --------------------------------------- We saw a similar exception in 2.4/2.5: {code:java} Nov 5 18:10:26 mirrormaker2-6c5bbffffc-jx85h mirrormaker2 ERROR [MirrorSourceConnector|task-0] Failure during poll. (org.apache.kafka.connect.mirror.MirrorSourceTask:159) java.lang.NullPointerException at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1071) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:847) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:303) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1249) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:259) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:226) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source){code} Prior to the exception, the client was disconnected. In NetworkClient.processDisconnection() several paths can lead to inProgressRequestVersion being set to null. If there's a MetadataRequest in flight to another node at the time, then the exception is hit when handling the response as inProgressRequestVersion is unboxed to an int. I was able to reproduce reliably with the following setup: - 2 brokers with SASL - consumer consuming from broker 0 - make broker 0 drop consumer connection - the consumer gets: {code:java} INFO [main]: [Consumer clientId=consumer-2, groupId=null] Error sending fetch request (sessionId=735517, epoch=2) to node 0: {}. org.apache.kafka.common.errors.DisconnectException{code} - make broker 0 fail authentication - consumer gets: {code:java} INFO [main]: [Consumer clientId=consumer-2, groupId=null] Failed authentication with localhost/127.0.0.1 (Authentication failed, invalid credentials) ERROR [main]: [Consumer clientId=consumer-2, groupId=null] Connection to node 0 (localhost/127.0.0.1:9093) failed authentication due to: Authentication failed, invalid credentials{code} - force metadata refresh by consumer, for example: consumer.partitionsFor("topic-that-does-not-exist"); - the consumer gets: {code:java} java.lang.NullPointerException at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1073) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:847) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:368) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1930) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1898) at main.ConsumerTest2.main(ConsumerTest2.java:37) {code} I'm not super familair with this code path but the following patch helped: {code:java} diff --git clients/src/main/java/org/apache/kafka/clients/NetworkClient.java clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d782df865..d3119f132 100644 --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1067,6 +1069,9 @@ public class NetworkClient implements KafkaClient { if (response.brokers().isEmpty()) { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now, null); + } else if (inProgressRequestVersion == null) { + log.warn("Ignoring metadata response ..."); + this.metadata.failedUpdate(now, null); } else { this.metadata.update(inProgressRequestVersion, response, now); } {code} > An unhandled SSL handshake exception in polling event - needed a retry logic > ---------------------------------------------------------------------------- > > Key: KAFKA-8933 > URL: https://issues.apache.org/jira/browse/KAFKA-8933 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.2.1 > Environment: software platform > Reporter: Remigius > Priority: Critical > > Already client is connected and during polling event, SSL handshake failure > happened. it led to leaving the co-ordinator. Even on SSL handshake failure > which was actually intermittent issue, polling should have some resilient and > retry the polling. Leaving group caused all instances of clients to drop and > left the messages in Kafka for long time until re-subscribe the kafka topic > manually. > > > {noformat} > 2019-09-06 04:03:09,016 ERROR [reactive-kafka-xxxx] > org.apache.kafka.clients.NetworkClient [Consumer clientId=aaa, groupId=bbb] > Connection to node 150 (host:port) failed authentication due to: SSL > handshake failed > 2019-09-06 04:03:09,021 ERROR [reactive-kafka-xxxx] > reactor.kafka.receiver.internals.DefaultKafkaReceiver Unexpected exception > java.lang.NullPointerException: null > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1012) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822) > ~[kafka-clients-2.2.1.jar!/:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) > ~[kafka-clients-2.2.1.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) > ~[kafka-clients-2.2.1.jar!/:?] > at > reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:470) > ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE] > at > reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:401) > ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE] > at > reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:335) > ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE] > at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) > ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE] > at > reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398) > ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE] > at > reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) > ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE] > at > reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100) > ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE] > at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) > ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE] > at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) > ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE] > at > org.springframework.cloud.sleuth.instrument.async.TraceCallable.call(TraceCallable.java:70) > ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) [?:?] > 2019-09-06 04:03:09,023 INFO [reactive-kafka-xxxx] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer > clientId=aaa, groupId=bbb] Member x_13-081e61ec-1509-4e0e-819e-58063d1ce8f6 > sending LeaveGroup request to coordinator{noformat} > -- This message was sent by Atlassian Jira (v8.3.4#803005)