alex gabriel created KAFKA-8206:
-----------------------------------

             Summary: A consumer can't discover new group coordinator when the 
cluster was partly restarted
                 Key: KAFKA-8206
                 URL: https://issues.apache.org/jira/browse/KAFKA-8206
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.2.0, 2.0.0, 1.0.0
            Reporter: alex gabriel


*A consumer can't discover new group coordinator when the cluster was partly 
restarted*

Preconditions:
I use Kafka server and Java kafka-client lib 2.2 version
I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
ZK(localhost:2181/localhost:2181)
I have replication factor 2 for the all my topics and 
'_unclean.leader.election.enable=true_' on both Kafka nodes.

Steps to reproduce:

1) Start 2nodes (localhost:9092/localhost:9093)
2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
{noformat}
// discovered group coordinator (0-node)
2019-04-09 16:23:18,963 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>

...metadatacache is updated (2 nodes in the cluster list)
2019-04-09 16:23:18,928 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
metadata request (type=MetadataRequest, topics=<ALL>) to node localhost:9092 
(id: -1 rack: null)>
2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
localhost:9092 (id: 0 rack: null))}>
{noformat}
3) Shutdown 1-node (localhost:9093)
{noformat}
// metadata was updated to the 4 version (but for some reasons it still had 2 
alive nodes inside cluster)
2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 
0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
localhost:9092 (id: 0 rack: null))}>

//consumers thinks that node-1 is still alive and try to send coordinator 
lookup to it but failed
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
2019-04-09 16:23:46,981 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
2019-04-09 16:24:01,117 WARN 
[org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
(localhost:9093) could not be established. Broker may not be available.>

// refreshing metadata again
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, 
clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
2019-04-09 16:24:01,117 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Coordinator discovery failed, refreshing metadata>

// metadata was updated to the 5 version where cluster had only 0-node 
localhost:9092 as expected.
2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - 
Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = 
P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions 
= [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = 
[0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, 
partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = 
[1])], controller = localhost:9092 (id: 0 rack: null))}>

// 0-node discovered as coordinator
2019-04-09 16:24:01,132 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
{noformat}
*At this point consumer stores only information about 0-node(localhost:9092) 
inside cluster property of the metadata cache.*

4) Shutdown 0-node (localhost:9092)
5) Start 1-node (localhost:9093)
{noformat}
//consumer tries to re-connect only to the 0-node
2019-04-09 16:24:40,649 DEBUG 
[org.apache.kafka.common.network.Selector.pollSelectionKeys] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Connection with 
localhost disconnected>
2019-04-09 16:24:40,649 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 0 disconnected.>
2019-04-09 16:24:40,649 DEBUG 
[org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Node 2147483647 
disconnected.>
2019-04-09 16:24:40,649 DEBUG 
[org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled 
request with header RequestHeader(apiKey=FETCH, apiVersion=10, 
clientId=events-consumer0, correlationId=209) due to node 0 being disconnected>
2019-04-09 16:24:40,649 INFO 
[org.apache.kafka.clients.FetchSessionHandler.handleError] - [Consumer 
clientId=events-consumer0, groupId=events-group-gabriel] Error sending fetch 
request (sessionId=1754516055, epoch=132) to node 0: 
org.apache.kafka.common.errors.DisconnectException.>
2019-04-09 16:24:40,650 INFO 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or 
invalid, will attempt rediscovery>
2019-04-09 16:24:40,650 DEBUG 
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator.lookupCoordinator]
 - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] No broker 
available to send FindCoordinator request>
2019-04-09 16:24:40,650 DEBUG 
[org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
[Consumer clientId=events-consumer0, groupId=events-group-gabriel] Give up 
sending metadata request since no node is available>
{noformat}
As I see the consumer tries to initialize connection only to the 0-node cause 
this is only node inside consumer's cluster metadata cache, thus consumer can't 
get a connection to the new group coordinator 1-node and as result can't poll 
any messages
To resolve this pending state 0-node must be restored OR consumer must be 
re-started(in this case consumer discover a new node from the initial brokers 
list)
p.s. The same behavior could be reproduced using a 3 nodes cluster.

_The question: is this by design that consumer stores metadata cache with only 
active nodes list?_
Since it leads to a situation when a last active node from the list goes down 
and the other node come to life but the consumer doesn't have any info about 
that node and trying to re-connect to the last active node ignoring new node.

>From my point of view, a consumer should always store some initial nodes list 
>and try to reconnect to the nodes from the initial list in case if there are 
>no alive nodes from the metadata cluster cache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to