[ https://issues.apache.org/jira/browse/KAFKA-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Calcote updated KAFKA-7378: -------------------------------- Description: I have an integration test that creates a Spring context, starting up the application. In the test's @Before method, I create a zookeeper and KafkaServer instance with available ephemeral service ports. The test's @After method shuts down these service instances. The Spring context registers a shutdown handler, which doesn't get called till after the @After method is run. This means my application's consumer polling loop doesn't detect a termination event until after the broker is gone. Here's what shows up in my log4j log: {noformat} 2018-09-05 11:56:13.980,1819313073756610 {} DEBUG o.a.k.c.NetworkClient [hmdb-kafka-polling-thread] Initiating connection to node localhost:9092 (id: -1 rack: null) 2018-09-05 11:56:13.980,1819313074091635 {} DEBUG o.a.k.c.n.Selector [hmdb-kafka-polling-thread] Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_171] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_171] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:326) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:226) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.1.jar:?] {noformat} This sequence continues forever because poll will not return and notice it's time to terminate my polling loop. was: I have an integration test that creates a Spring context, starting up the application. In the test's @Before method, I create a zookeeper and KafkaServer instance with available ephemeral service ports. The test's @After method shuts down these service instances. The Spring context registers a shutdown handler, which doesn't get called till after the @After method is run. This means my application's consumer polling loop doesn't detect a termination event until after the broker is gone. Here's what shows up in my log4j log: {noformat} 2018-09-05 11:56:13.980,1819313073756610 {} DEBUG o.a.k.c.NetworkClient [hmdb-kafka-polling-thread] Initiating connection to node localhost:9092 (id: -1 rack: null) 2018-09-05 11:56:13.980,1819313074091635 {} DEBUG o.a.k.c.n.Selector [hmdb-kafka-polling-thread] Connection with localhost/127.0.0.1 disconnected java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_171] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_171] at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:326) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:226) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.1.jar:?] {noformat} This sequence continues forever because poll will not return and notice it's time to terminate my polling loop. > Consumer poll hangs if broker shutdown while consumer attempting to connect. > ---------------------------------------------------------------------------- > > Key: KAFKA-7378 > URL: https://issues.apache.org/jira/browse/KAFKA-7378 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.11.0.1 > Environment: Java JDK 8, Linux RedHat 7.5 > Reporter: John Calcote > Priority: Major > > I have an integration test that creates a Spring context, starting up the > application. In the test's @Before method, I create a zookeeper and > KafkaServer instance with available ephemeral service ports. The test's > @After method shuts down these service instances. The Spring context > registers a shutdown handler, which doesn't get called till after the @After > method is run. This means my application's consumer polling loop doesn't > detect a termination event until after the broker is gone. Here's what shows > up in my log4j log: > > {noformat} > 2018-09-05 11:56:13.980,1819313073756610 {} DEBUG o.a.k.c.NetworkClient > [hmdb-kafka-polling-thread] Initiating connection to node localhost:9092 (id: > -1 rack: null) > 2018-09-05 11:56:13.980,1819313074091635 {} DEBUG o.a.k.c.n.Selector > [hmdb-kafka-polling-thread] Connection with localhost/127.0.0.1 disconnected > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_171] > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_171] > at > org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361) > ~[kafka-clients-0.11.0.1.jar:?] > at org.apache.kafka.common.network.Selector.poll(Selector.java:326) > ~[kafka-clients-0.11.0.1.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:226) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > ~[kafka-clients-0.11.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > ~[kafka-clients-0.11.0.1.jar:?] > {noformat} > > This sequence continues forever because poll will not return and notice it's > time to terminate my polling loop. -- This message was sent by Atlassian JIRA (v7.6.3#76005)