[ 
https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-14845:
--------------------------------------
    Description: 
Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case does not exactly match the 
scenario covered by KAFKA-6584.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
        at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
        at 
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
        at kafka.controller.KafkaController.process(KafkaController.scala:1853)
        at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not 
registered. Only a broker restart (or forced recreation of the Zookeeper 
session) can mitigate at this point.

An analysis of the commit logs from Zookeeper does show the following sequence 
of transactions with timestamps from the Zookeeper node.
 - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
SetData(<BrokerInfo>)]
 - 2023-03-05T16:02:21.336Z: CloseSession [-11]

The fix for KAFKA-6584 does not cover this case because here, the session ID is 
not surfaced and recorded by the ZK client in Kafka (there was no lower-level 
logs to ascertain if the Netty client ever received any response for that 
session).

As a remediation, perhaps the source of identity of the broker (currently 
conveyed by the Zookeeper session ID) could be explicitly added to the znode 
data (assuming the Zookeeper Multi is atomic, the znode must have the 
BrokerInfo (or any other user data provided with the SetData command) if and 
only if it is successfully created).
----
*Update:* this can be reproduced with this [automated 
test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. 
The sequence of events produced by the test is the following.

!phoque.png!

*(1)* The Zookeeper client is created by the application. It opens a TCP 
connection, then send a Connect request which is processed on the Netty NIO 
thread pool. A CreateSession request is internally enqueued to be handled by 
the synchronous request processor. Once processed, a session id is generated 
and recorded, and that session id 0x1001764b3920000 is returned to the client.
{code:java}
[2023-03-29 16:22:04,258] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, 
L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10 
reqpath:n/a
[2023-03-29 16:22:04,310] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout = 
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
*(2)* The Kafka client on top of the Zookeeper client is used to register the 
broker. The multiTransaction API in the Zookeeper client is invoked and a multi 
request is sent to Zookeeper, with a CreateNode and SetData transactions.
{code:java}
[2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient)
sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 
reqpath:n/a
[2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is: 
3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 
(kafka.zk.KafkaZkClient)
[2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 
with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 
(kafka.zk.KafkaZkClient){code}
*(3)* The client generates Ping request every 6 seconds (read timeout / 2). The 
response to these pings are not sent back to the client (enforced by the test, 
see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the 
client initiates a new connection.
{code:java}
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from 
server in 12001ms for session id 0x1001764b3920000 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server 
localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, 
L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
*(4)* A new TCP connection is opened and the Connect request sent. A delay is 
artificially injected to make the connection request time out (connection 
timeout, which is equal to the session timeout in this case).
{code:java}
[2023-03-29 16:22:18,388] INFO Opening socket connection to server 
localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,395] INFO Socket connection established, initiating 
session, client: /[0:0:0:0:0:0:0:1]:50121, server: 
localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, 
L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
>>>> CONNECTION DELAY = 18500 ms{code}
*(5)* After 18 seconds (which is both the session and connection timeout), the 
server expires the session and an internal closeSession is generated on the 
server and enqueued to be processed by the synchronous request processor.
{code:java}
[2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 
reqpath:n/a
[2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from 
server in 18005ms for session id 0x1001764b3920000 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server 
localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting 
reconnect except it is a SessionExpiredException. 
(org.apache.zookeeper.ClientCnxn){code}
*(6)* In parallel, the client detects the connection timeout and reconnects. A 
new TCP connection is opened, the client is notified by the server that the 
session 0x10016b7f51c0000 has expired.
{code:java}
[2023-03-29 16:22:37,972] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, 
L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client 
/127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, 
session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
7) The client closes its "handle" (~client in Zookeeper terminology) with 
Zookeeper.
{code:java}
[2023-03-29 16:22:37,994] INFO EventThread shut down for session: 
0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
[2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new 
session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
8) The client creates a new handle which initiates a new session with 
Zookeeper. A new TCP connection is opened, and the createSession is propagated 
to the synchronous request processor. The session is created by the server, but 
the response never sent back to the client.
{code:java}
[2023-03-29 16:22:37,998] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=18000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e 
(org.apache.zookeeper.ZooKeeper)
[2023-03-29 16:22:37,999] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,005] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, 
L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4 
txntype:-10 reqpath:n/a
Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
9) An inter-thread signal triggered during the creation of the second session 
on the synchronous request processor is captured by the main (application) 
thread which starts a broker registration. The client immediately sends the 
multi request to the server using the opened TCP connection even before 
receiving confirmation of the successful processing of the ConnectRequest, and 
without knowing the id of the current session. The server processes the multi 
request and creates the znode /brokers/ids/18 using the session 
0x1001764b3920001. The response is not sent to the client, which is not made 
aware the request has been processed.
{code:java}
[2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient)
[ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 
txntype:14 reqpath:n/a
Dropping 1,3269,0
 org.apache.zookeeper.MultiResponse@99bc5055{code}
10) The connection times out again, since no response is sent to the client 
within 18 seconds. The Zookeeper client library returns a CONNLOSS response to 
the multi request. The Zookeeper client built on top of it in Kafka retries on 
this type of error. So, it retries, waiting for a new connection to be 
established.
{code:java}
[2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from 
server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181, 
Closing socket connection. Attempting reconnect except it is a 
SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code}
11) A new connection attempt is established. This time, we allow it to go 
through.
{code:java}
[2023-03-29 16:22:57,490] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,494] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, 
L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10 
reqpath:n/a
[2023-03-29 16:22:57,522] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout = 
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
12) The multi request is retried and the response NODEEXISTS is received. Kafka 
then sends a getData to Zookeeper to find the ephemeral owner of the znode. The 
ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither 
the current session 0x1001764b3920002 (72083315314786306) nor the previously 
recorded one in the Kafka client (0x1001764b3920000) during the first znode 
creation.
{code:java}
sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 
reqpath:n/a
sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe 
txntype:unknown reqpath:/brokers/ids/18
[2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '72083315314786305' does not 
match current session '72083315314786306' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
    at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
{code}
Note that on the diagram, we introduce a delay in the expiration of the session 
0x1001764b3920001. The expiration of the session, and deletion of its 
associated ephemeral nodes, is scheduled on the synchronous processor 
asynchronously of incoming requests, such that it is possible for the multi 
request to come and be consumed by the synchronous processor before the 
closeSession request for 0x1001764b3920001. The ephemeral znode for that 
session is therefore still present in Zookeeper's data tree when the new multi 
request is processed.

  was:
Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case does not exactly match the 
scenario covered by KAFKA-6584.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
        at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
        at 
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
        at kafka.controller.KafkaController.process(KafkaController.scala:1853)
        at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not 
registered. Only a broker restart (or forced recreation of the Zookeeper 
session) can mitigate at this point.

An analysis of the commit logs from Zookeeper does show the following sequence 
of transactions with timestamps from the Zookeeper node.
 - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
SetData(<BrokerInfo>)]
 - 2023-03-05T16:02:21.336Z: CloseSession [-11]

The fix for KAFKA-6584 does not cover this case because here, the session ID is 
not surfaced and recorded by the ZK client in Kafka (there was no lower-level 
logs to ascertain if the Netty client ever received any response for that 
session).

As a remediation, perhaps the source of identity of the broker (currently 
conveyed by the Zookeeper session ID) could be explicitly added to the znode 
data (assuming the Zookeeper Multi is atomic, the znode must have the 
BrokerInfo (or any other user data provided with the SetData command) if and 
only if it is successfully created).
----
*Update:* this can be reproduced with this [automated 
test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. 
The sequence of events produced by the test is the following.

!phoque.png!

1) The Zookeeper client is created by the application. It opens a TCP 
connection, then send a Connect request which is processed on the Netty NIO 
thread pool. A CreateSession request is internally enqueued to be handled by 
the synchronous request processor. Once processed, a session id is generated 
and recorded, and that session id 0x1001764b3920000 is returned to the client.
{code:java}
[2023-03-29 16:22:04,258] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, 
L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10 
reqpath:n/a
[2023-03-29 16:22:04,310] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout = 
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
2) The Kafka client on top of the Zookeeper client is used to register the 
broker. The multiTransaction API in the Zookeeper client is invoked and a multi 
request is sent to Zookeeper, with a CreateNode and SetData transactions.
{code:java}
[2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient)
sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 
reqpath:n/a
[2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is: 
3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 
(kafka.zk.KafkaZkClient)
[2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 
with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 
(kafka.zk.KafkaZkClient){code}
3) The client generates Ping request every 6 seconds (read timeout / 2). The 
response to these pings are not sent back to the client (enforced by the test, 
see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the 
client initiates a new connection.
{code:java}
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
Dropping -2,3265,0
[2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from 
server in 12001ms for session id 0x1001764b3920000 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server 
localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, 
L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
4) A new TCP connection is opened and the Connect request sent. A delay is 
artificially injected to make the connection request time out (connection 
timeout, which is equal to the session timeout in this case).
{code:java}
[2023-03-29 16:22:18,388] INFO Opening socket connection to server 
localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,395] INFO Socket connection established, initiating 
session, client: /[0:0:0:0:0:0:0:1]:50121, server: 
localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, 
L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
>>>> CONNECTION DELAY = 18500 ms{code}
5) After 18 seconds (which is both the session and connection timeout), the 
server expires the session and an internal closeSession is generated on the 
server and enqueued to be processed by the synchronous request processor.
{code:java}
[2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 
reqpath:n/a
[2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from 
server in 18005ms for session id 0x1001764b3920000 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server 
localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting 
reconnect except it is a SessionExpiredException. 
(org.apache.zookeeper.ClientCnxn){code}
6) In parallel, the client detects the connection timeout and reconnects. A new 
TCP connection is opened, the client is notified by the server that the session 
0x10016b7f51c0000 has expired.
{code:java}
[2023-03-29 16:22:37,972] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, 
L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client 
/127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, 
session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
7) The client closes its "handle" (~client in Zookeeper terminology) with 
Zookeeper.
{code:java}
[2023-03-29 16:22:37,994] INFO EventThread shut down for session: 
0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
[2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new 
session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
8) The client creates a new handle which initiates a new session with 
Zookeeper. A new TCP connection is opened, and the createSession is propagated 
to the synchronous request processor. The session is created by the server, but 
the response never sent back to the client.
{code:java}
[2023-03-29 16:22:37,998] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=18000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e 
(org.apache.zookeeper.ZooKeeper)
[2023-03-29 16:22:37,999] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,005] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, 
L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4 
txntype:-10 reqpath:n/a
Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
9) An inter-thread signal triggered during the creation of the second session 
on the synchronous request processor is captured by the main (application) 
thread which starts a broker registration. The client immediately sends the 
multi request to the server using the opened TCP connection even before 
receiving confirmation of the successful processing of the ConnectRequest, and 
without knowing the id of the current session. The server processes the multi 
request and creates the znode /brokers/ids/18 using the session 
0x1001764b3920001. The response is not sent to the client, which is not made 
aware the request has been processed.
{code:java}
[2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient)
[ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 
txntype:14 reqpath:n/a
Dropping 1,3269,0
 org.apache.zookeeper.MultiResponse@99bc5055{code}
10) The connection times out again, since no response is sent to the client 
within 18 seconds. The Zookeeper client library returns a CONNLOSS response to 
the multi request. The Zookeeper client built on top of it in Kafka retries on 
this type of error. So, it retries, waiting for a new connection to be 
established.
{code:java}
[2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from 
server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181, 
Closing socket connection. Attempting reconnect except it is a 
SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code}
11) A new connection attempt is established. This time, we allow it to go 
through.
{code:java}
[2023-03-29 16:22:57,490] INFO Opening socket connection to server 
localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,494] INFO Socket connection established, initiating 
session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 
(org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, 
L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10 
reqpath:n/a
[2023-03-29 16:22:57,522] INFO Session establishment complete on server 
localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout = 
18000 (org.apache.zookeeper.ClientCnxn)
[2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
12) The multi request is retried and the response NODEEXISTS is received. Kafka 
then sends a getData to Zookeeper to find the ephemeral owner of the znode. The 
ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither 
the current session 0x1001764b3920002 (72083315314786306) nor the previously 
recorded one in the Kafka client (0x1001764b3920000) during the first znode 
creation.
{code:java}
sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 
reqpath:n/a
sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe 
txntype:unknown reqpath:/brokers/ids/18
[2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '72083315314786305' does not 
match current session '72083315314786306' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
    at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
{code}
Note that on the diagram, we introduce a delay in the expiration of the session 
0x1001764b3920001. The expiration of the session, and deletion of its 
associated ephemeral nodes, is scheduled on the synchronous processor 
asynchronously of incoming requests, such that it is possible for the multi 
request to come and be consumed by the synchronous processor before the 
closeSession request for 0x1001764b3920001. The ephemeral znode for that 
session is therefore still present in Zookeeper's data tree when the new multi 
request is processed.


> Broker ZNode creation can fail due to lost Zookeeper Session ID
> ---------------------------------------------------------------
>
>                 Key: KAFKA-14845
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14845
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Alexandre Dupriez
>            Assignee: Alexandre Dupriez
>            Priority: Minor
>         Attachments: kafka-broker-reg.log, phoque.png
>
>
> Our production environment faced a use case where registration of a broker 
> failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
> case is not without familiarity to that fixed by KAFKA-6584 and induced by 
> the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
> A network partition disturbed communication channels between the Kafka and 
> Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
> broker was not able to re-register with Zookeeper and was excluded from the 
> cluster until it was restarted. Broker logs show the failed registration due 
> to a "conflicting" znode write which in this case does not exactly match the 
> scenario covered by KAFKA-6584.
> The sequence of logs on the broker is as follows.
> First, a connection is established with the Zookeeper node 3.
> {code:java}
> [2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
> L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> An existing Zookeeper session was expired, and upon reconnection, the 
> Zookeeper state change handler was invoked. The creation of the ephemeral 
> znode /brokers/ids/18 started on the controller thread.
> {code:java}
> [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient){code}
> The client "session" timed out after 6 seconds. Note the session is 0x0 and 
> the absence of "{_}Session establishment complete{_}" log: the broker appears 
> to have never received or processed the response from the Zookeeper node.
> {code:java}
> [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
> server in 6000ms for sessionid 0x0, closing socket connection and attempting 
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
> L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
> started waiting on a new connection notification.
> {code:java}
> [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient){code}
> A new connection was created with the Zookeeper node 1. Note that a valid 
> (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
> {code:java}
> [2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
> L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-05 16:02:03,054] INFO Session establishment complete on server 
> zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 
> 18000 (org.apache.zookeeper.ClientCnxn){code}
> The Kafka ZK client is notified of the connection.
> {code:java}
> [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> The broker sends the request to create the znode {{/brokers/ids/18}} which 
> already exists. The error path implemented for KAFKA-6584 is then followed. 
> However, in this case, the session owning the ephemeral node 
> {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
> active Zookeeper session which the broker has recorded. And it is also 
> different from the current session {{0x1006c6e0b830001}} 
> ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
> attempted.
> {code:java}
> [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '216172783240153793' does not 
> match current session '72176813933264897' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
>         at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
>         at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
>         at 
> kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
>         at 
> kafka.controller.KafkaController.process(KafkaController.scala:1853)
>         at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
> The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
> server logs:
> {code:java}
> [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
> session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
> {code}
> The ephemeral znode is then deleted and never recreated. The broker is not 
> registered. Only a broker restart (or forced recreation of the Zookeeper 
> session) can mitigate at this point.
> An analysis of the commit logs from Zookeeper does show the following 
> sequence of transactions with timestamps from the Zookeeper node.
>  - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
>  - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
> SetData(<BrokerInfo>)]
>  - 2023-03-05T16:02:21.336Z: CloseSession [-11]
> The fix for KAFKA-6584 does not cover this case because here, the session ID 
> is not surfaced and recorded by the ZK client in Kafka (there was no 
> lower-level logs to ascertain if the Netty client ever received any response 
> for that session).
> As a remediation, perhaps the source of identity of the broker (currently 
> conveyed by the Zookeeper session ID) could be explicitly added to the znode 
> data (assuming the Zookeeper Multi is atomic, the znode must have the 
> BrokerInfo (or any other user data provided with the SetData command) if and 
> only if it is successfully created).
> ----
> *Update:* this can be reproduced with this [automated 
> test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. 
> The sequence of events produced by the test is the following.
> !phoque.png!
> *(1)* The Zookeeper client is created by the application. It opens a TCP 
> connection, then send a Connect request which is processed on the Netty NIO 
> thread pool. A CreateSession request is internally enqueued to be handled by 
> the synchronous request processor. Once processed, a session id is generated 
> and recorded, and that session id 0x1001764b3920000 is returned to the client.
> {code:java}
> [2023-03-29 16:22:04,258] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, 
> L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:04,310] INFO Session establishment complete on server 
> localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout 
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> *(2)* The Kafka client on top of the Zookeeper client is used to register the 
> broker. The multiTransaction API in the Zookeeper client is invoked and a 
> multi request is sent to Zookeeper, with a CreateNode and SetData 
> transactions.
> {code:java}
> [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 
> reqpath:n/a
> [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 
> is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 
> (kafka.zk.KafkaZkClient)
> [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 
> with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 
> (kafka.zk.KafkaZkClient){code}
> *(3)* The client generates Ping request every 6 seconds (read timeout / 2). 
> The response to these pings are not sent back to the client (enforced by the 
> test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), 
> the client initiates a new connection.
> {code:java}
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from 
> server in 12001ms for session id 0x1001764b3920000 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server 
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
> except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, 
> L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> *(4)* A new TCP connection is opened and the Connect request sent. A delay is 
> artificially injected to make the connection request time out (connection 
> timeout, which is equal to the session timeout in this case).
> {code:java}
> [2023-03-29 16:22:18,388] INFO Opening socket connection to server 
> localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,395] INFO Socket connection established, initiating 
> session, client: /[0:0:0:0:0:0:0:1]:50121, server: 
> localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, 
> L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> >>>> CONNECTION DELAY = 18500 ms{code}
> *(5)* After 18 seconds (which is both the session and connection timeout), 
> the server expires the session and an internal closeSession is generated on 
> the server and enqueued to be processed by the synchronous request processor.
> {code:java}
> [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 
> reqpath:n/a
> [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from 
> server in 18005ms for session id 0x1001764b3920000 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server 
> localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting 
> reconnect except it is a SessionExpiredException. 
> (org.apache.zookeeper.ClientCnxn){code}
> *(6)* In parallel, the client detects the connection timeout and reconnects. 
> A new TCP connection is opened, the client is notified by the server that the 
> session 0x10016b7f51c0000 has expired.
> {code:java}
> [2023-03-29 16:22:37,972] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, 
> L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client 
> /127.0.0.1:50131, probably expired 
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
> 7) The client closes its "handle" (~client in Zookeeper terminology) with 
> Zookeeper.
> {code:java}
> [2023-03-29 16:22:37,994] INFO EventThread shut down for session: 
> 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
> [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new 
> session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
> 8) The client creates a new handle which initiates a new session with 
> Zookeeper. A new TCP connection is opened, and the createSession is 
> propagated to the synchronous request processor. The session is created by 
> the server, but the response never sent back to the client.
> {code:java}
> [2023-03-29 16:22:37,998] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=18000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e 
> (org.apache.zookeeper.ZooKeeper)
> [2023-03-29 16:22:37,999] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,005] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, 
> L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 
> zxid:0xcc4 txntype:-10 reqpath:n/a
> Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
> 9) An inter-thread signal triggered during the creation of the second session 
> on the synchronous request processor is captured by the main (application) 
> thread which starts a broker registration. The client immediately sends the 
> multi request to the server using the opened TCP connection even before 
> receiving confirmation of the successful processing of the ConnectRequest, 
> and without knowing the id of the current session. The server processes the 
> multi request and creates the znode /brokers/ids/18 using the session 
> 0x1001764b3920001. The response is not sent to the client, which is not made 
> aware the request has been processed.
> {code:java}
> [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 
> txntype:14 reqpath:n/a
> Dropping 1,3269,0
>  org.apache.zookeeper.MultiResponse@99bc5055{code}
> 10) The connection times out again, since no response is sent to the client 
> within 18 seconds. The Zookeeper client library returns a CONNLOSS response 
> to the multi request. The Zookeeper client built on top of it in Kafka 
> retries on this type of error. So, it retries, waiting for a new connection 
> to be established.
> {code:java}
> [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from 
> server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:56,014] WARN Session 0x0 for server 
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
> except it is a SessionExpiredException. 
> (org.apache.zookeeper.ClientCnxn){code}
> 11) A new connection attempt is established. This time, we allow it to go 
> through.
> {code:java}
> [2023-03-29 16:22:57,490] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,494] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, 
> L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:57,522] INFO Session establishment complete on server 
> localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout 
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> 12) The multi request is retried and the response NODEEXISTS is received. 
> Kafka then sends a getData to Zookeeper to find the ephemeral owner of the 
> znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which 
> matches neither the current session 0x1001764b3920002 (72083315314786306) nor 
> the previously recorded one in the Kafka client (0x1001764b3920000) during 
> the first znode creation.
> {code:java}
> sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 
> reqpath:n/a
> sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe 
> txntype:unknown reqpath:/brokers/ids/18
> [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '72083315314786305' does not 
> match current session '72083315314786306' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>     at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>     at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
>     at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
>     at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
>     at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
>     at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
> {code}
> Note that on the diagram, we introduce a delay in the expiration of the 
> session 0x1001764b3920001. The expiration of the session, and deletion of its 
> associated ephemeral nodes, is scheduled on the synchronous processor 
> asynchronously of incoming requests, such that it is possible for the multi 
> request to come and be consumed by the synchronous processor before the 
> closeSession request for 0x1001764b3920001. The ephemeral znode for that 
> session is therefore still present in Zookeeper's data tree when the new 
> multi request is processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to