[ 
https://issues.apache.org/jira/browse/KAFKA-8933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969309#comment-16969309
 ] 

Mickael Maison commented on KAFKA-8933:
---------------------------------------

We saw a similar exception in 2.4/2.5:

{code:java}
Nov 5 18:10:26 mirrormaker2-6c5bbffffc-jx85h mirrormaker2 ERROR 
[MirrorSourceConnector|task-0] Failure during poll. 
(org.apache.kafka.connect.mirror.MirrorSourceTask:159)
java.lang.NullPointerException
 at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1071)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:847)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:303)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1249)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
 at 
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
 at 
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:259)
 at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:226)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
 at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
 at java.util.concurrent.FutureTask.run(Unknown Source)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
 at java.lang.Thread.run(Unknown Source){code}

Prior to the exception, the client was disconnected. In 
NetworkClient.processDisconnection() several paths can lead to 
inProgressRequestVersion being set to null. If there's a MetadataRequest in 
flight to another node at the time, then the exception is hit when handling the 
response as inProgressRequestVersion is unboxed to an int.

I was able to reproduce reliably with the following setup:
- 2 brokers with SASL
- consumer consuming from broker 0
- make broker 0 drop consumer connection
- the consumer gets:
{code:java}
INFO [main]: [Consumer clientId=consumer-2, groupId=null] Error sending fetch 
request (sessionId=735517, epoch=2) to node 0: {}.
org.apache.kafka.common.errors.DisconnectException{code}
- make broker 0 fail authentication
- consumer gets:
{code:java}
INFO [main]: [Consumer clientId=consumer-2, groupId=null] Failed authentication 
with localhost/127.0.0.1 (Authentication failed, invalid credentials)
ERROR [main]: [Consumer clientId=consumer-2, groupId=null] Connection to node 0 
(localhost/127.0.0.1:9093) failed authentication due to: Authentication failed, 
invalid credentials{code}
- force metadata refresh by consumer, for example: 
consumer.partitionsFor("topic-that-does-not-exist");
- the consumer gets:
{code:java}
java.lang.NullPointerException
    at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1073)
    at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:847)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:368)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1930)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1898)
    at main.ConsumerTest2.main(ConsumerTest2.java:37)
{code}

I'm not super familair with this code path but the following patch helped:

{code:java}
diff --git clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d782df865..d3119f132 100644
--- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1067,6 +1069,9 @@ public class NetworkClient implements KafkaClient {
             if (response.brokers().isEmpty()) {
                 log.trace("Ignoring empty metadata response with correlation 
id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now, null);
+            } else if (inProgressRequestVersion == null) {
+                log.warn("Ignoring metadata response ...");
+                this.metadata.failedUpdate(now, null);
             } else {
                 this.metadata.update(inProgressRequestVersion, response, now);
             }
{code}

 

> An unhandled SSL handshake exception in polling event - needed a retry logic
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-8933
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8933
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.2.1
>         Environment: software platform
>            Reporter: Remigius
>            Priority: Critical
>
> Already client is connected and during polling event, SSL handshake failure 
> happened. it led to leaving the co-ordinator. Even on SSL handshake failure 
> which was actually intermittent issue, polling should have some resilient and 
> retry the polling. Leaving group caused all instances of clients to drop and 
> left the messages in Kafka for long time until re-subscribe the kafka topic 
> manually.
>  
>  
> {noformat}
> 2019-09-06 04:03:09,016 ERROR [reactive-kafka-xxxx] 
> org.apache.kafka.clients.NetworkClient [Consumer clientId=aaa, groupId=bbb] 
> Connection to node 150 (host:port) failed authentication due to: SSL 
> handshake failed
> 2019-09-06 04:03:09,021 ERROR [reactive-kafka-xxxx]  
> reactor.kafka.receiver.internals.DefaultKafkaReceiver Unexpected exception
> java.lang.NullPointerException: null
>  at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1012)
>  ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:822)
>  ~[kafka-clients-2.2.1.jar!/:?]
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) 
> ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
>  ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) 
> ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> ~[kafka-clients-2.2.1.jar!/:?]
>  at 
> reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:470)
>  ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
>  at 
> reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:401)
>  ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
>  at 
> reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:335)
>  ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
>  at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) 
> ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
>  at 
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
>  ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
>  at 
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
>  ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
>  at 
> reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100)
>  ~[reactor-kafka-1.1.1.RELEASE.jar!/:1.1.1.RELEASE]
>  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) 
> ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
>  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) 
> ~[reactor-core-3.2.10.RELEASE.jar!/:3.2.10.RELEASE]
>  at 
> org.springframework.cloud.sleuth.instrument.async.TraceCallable.call(TraceCallable.java:70)
>  ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
>  at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>  at java.lang.Thread.run(Thread.java:834) [?:?]
> 2019-09-06 04:03:09,023 INFO  [reactive-kafka-xxxx] 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer 
> clientId=aaa, groupId=bbb] Member x_13-081e61ec-1509-4e0e-819e-58063d1ce8f6 
> sending LeaveGroup request to coordinator{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to