[ 
https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-14845:
--------------------------------------
    Description: 
Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case does not exactly match the 
scenario covered by KAFKA-6584.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
        at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
        at 
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
        at kafka.controller.KafkaController.process(KafkaController.scala:1853)
        at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not 
registered. Only a broker restart (or forced recreation of the Zookeeper 
session) can mitigate at this point.

I fail to understand where the session {{0x300000043230ac1}} comes from. An 
analysis of the commit logs from Zookeeper does show the following sequence of 
transactions with timestamps from the Zookeeper node.
 - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
SetData(<BrokerInfo>)]
 - 2023-03-05T16:02:21.336Z: CloseSession [-11]

-A code based example was attempted to reproduce the use case (available 
[here|https://github.com/Hangleton/kafka-tools/blob/master/src/main/java/kafka/repro/BrokerRegistrationTest.java]),
 however, it fails to reproduce the occurrence of the "phantom" intermediate 
session. Instead, the test case forces session renewal after the Multi 
transaction is processed for the first time. It basically tries to produce the 
following sequence.-

!broker-registration.drawio.png!
 - -The ephemeral owner recorded after a successful registration is 
{{{}0xAAAAAAAAAAAAAA{}}}.-
 - -The session expires and a new one ({{{}0x1000c7cbe1d0000{}}}) is created.-
 - -The broker registration callback is invoked on the controller. The Zk 
client enqueues the {{Multi}} request.-
 - -The response is never receives by the broker.-
 - -The client connection times out.-
 - -The session expires.-
 - -Upon reconnection, a new session ({{{}0x1000c7cbe1d0001{}}}) is created.-
 - -The Zk client retries upon receiving the {{CONNECTIONLOSS}} response.-

-Note the session renewal right after the response from the first Multi request 
is dropped. There is a race condition between the {{AdminZkClient}} and the 
underlying ZK client which is explicitly controlled in this test so that the 
{{AdminZkClient}} does not see the ZK client as disconnected.-

The fix for KAFKA-6584 does not cover this case because here, the session ID is 
not surfaced and recorded by the ZK client in Kafka (there was no lower-level 
logs to ascertain if the Netty client ever received any response for that 
session).

As a remediation, perhaps the source of identity of the broker (currently 
conveyed by the Zookeeper session ID) could be explicitly added to the znode 
data (assuming the Zookeeper Multi is atomic, the znode must have the 
BrokerInfo (or any other user data provided with the SetData command) if and 
only if it is successfully created).

  was:
Our production environment faced a use case where registration of a broker 
failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
case is not without familiarity to that fixed by KAFKA-6584 and induced by the 
Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.

A network partition disturbed communication channels between the Kafka and 
Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
broker was not able to re-register with Zookeeper and was excluded from the 
cluster until it was restarted. Broker logs show the failed registration due to 
a "conflicting" znode write which in this case does not exactly match the 
scenario covered by KAFKA-6584.

The sequence of logs on the broker is as follows.

First, a connection is established with the Zookeeper node 3.
{code:java}
[2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
An existing Zookeeper session was expired, and upon reconnection, the Zookeeper 
state change handler was invoked. The creation of the ephemeral znode 
/brokers/ids/18 started on the controller thread.
{code:java}
[2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
(kafka.zk.KafkaZkClient){code}
The client "session" timed out after 6 seconds. Note the session is 0x0 and the 
absence of "{_}Session establishment complete{_}" log: the broker appears to 
have never received or processed the response from the Zookeeper node.
{code:java}
[2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
server in 6000ms for sessionid 0x0, closing socket connection and attempting 
reconnect (org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty){code}
Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
started waiting on a new connection notification.
{code:java}
[2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient){code}
A new connection was created with the Zookeeper node 1. Note that a valid (new) 
session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
{code:java}
[2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
(org.apache.zookeeper.ClientCnxn)
[2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
(org.apache.zookeeper.ClientCnxnSocketNetty)
[2023-03-05 16:02:03,054] INFO Session establishment complete on server 
zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 
(org.apache.zookeeper.ClientCnxn){code}
The Kafka ZK client is notified of the connection.
{code:java}
[2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient){code}
The broker sends the request to create the znode {{/brokers/ids/18}} which 
already exists. The error path implemented for KAFKA-6584 is then followed. 
However, in this case, the session owning the ephemeral node 
{{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
active Zookeeper session which the broker has recorded. And it is also 
different from the current session {{0x1006c6e0b830001}} 
({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
attempted.
{code:java}
[2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
/brokers/ids/18, node already exists and owner '216172783240153793' does not 
match current session '72176813933264897' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
NodeExists
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
        at 
kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
        at 
kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
        at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
        at 
kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
        at kafka.controller.KafkaController.process(KafkaController.scala:1853)
        at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
        at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
server logs:
{code:java}
[2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
{code}
The ephemeral znode is then deleted and never recreated. The broker is not 
registered. Only a broker restart (or forced recreation of the Zookeeper 
session) can mitigate at this point.

I fail to understand where the session {{0x300000043230ac1}} comes from. An 
analysis of the commit logs from Zookeeper does show the following sequence of 
transactions with timestamps from the Zookeeper node.
 - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
SetData(<BrokerInfo>)]
 - 2023-03-05T16:02:21.336Z: CloseSession [-11]

A code based example was attempted to reproduce the use case (available 
[here|https://github.com/Hangleton/kafka-tools/blob/master/src/main/java/kafka/repro/BrokerRegistrationTest.java]),
 however, it fails to reproduce the occurrence of the "phantom" intermediate 
session. Instead, the test case forces session renewal after the Multi 
transaction is processed for the first time. It basically tries to produce the 
following sequence.

!broker-registration.drawio.png!

- The ephemeral owner recorded after a successful registration is 
{{0xAAAAAAAAAAAAAA}}.
- The session expires and a new one ({{0x1000c7cbe1d0000}}) is created.
- The broker registration callback is invoked on the controller. The Zk client 
enqueues the {{Multi}} request.
- The response is never receives by the broker.
- The client connection times out.
- The session expires.
- Upon reconnection, a new session ({{0x1000c7cbe1d0001}}) is created.
- The Zk client retries upon receiving the {{CONNECTIONLOSS}} response.

Note the session renewal right after the response from the first Multi request 
is dropped. There is a race condition between the {{AdminZkClient}} and the 
underlying ZK client which is explicitly controlled in this test so that the 
{{AdminZkClient}} does not see the ZK client as disconnected.

The fix for KAFKA-6584 does not cover this case because here, the session ID is 
not surfaced and recorded by the ZK client in Kafka (there was no lower-level 
logs to ascertain if the Netty client ever received any response for that 
session).

As a remediation, perhaps the source of identity of the broker (currently 
conveyed by the Zookeeper session ID) could be explicitly added to the znode 
data (assuming the Zookeeper Multi is atomic, the znode must have the 
BrokerInfo (or any other user data provided with the SetData command) if and 
only if it is successfully created).


> Broker ZNode creation can fail due to a session ID unknown to the broker
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-14845
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14845
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Alexandre Dupriez
>            Assignee: Alexandre Dupriez
>            Priority: Minor
>         Attachments: broker-registration.drawio.png
>
>
> Our production environment faced a use case where registration of a broker 
> failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
> case is not without familiarity to that fixed by KAFKA-6584 and induced by 
> the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
> A network partition disturbed communication channels between the Kafka and 
> Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
> broker was not able to re-register with Zookeeper and was excluded from the 
> cluster until it was restarted. Broker logs show the failed registration due 
> to a "conflicting" znode write which in this case does not exactly match the 
> scenario covered by KAFKA-6584.
> The sequence of logs on the broker is as follows.
> First, a connection is established with the Zookeeper node 3.
> {code:java}
> [2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
> L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> An existing Zookeeper session was expired, and upon reconnection, the 
> Zookeeper state change handler was invoked. The creation of the ephemeral 
> znode /brokers/ids/18 started on the controller thread.
> {code:java}
> [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient){code}
> The client "session" timed out after 6 seconds. Note the session is 0x0 and 
> the absence of "{_}Session establishment complete{_}" log: the broker appears 
> to have never received or processed the response from the Zookeeper node.
> {code:java}
> [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
> server in 6000ms for sessionid 0x0, closing socket connection and attempting 
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
> L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
> started waiting on a new connection notification.
> {code:java}
> [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient){code}
> A new connection was created with the Zookeeper node 1. Note that a valid 
> (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
> {code:java}
> [2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
> L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-05 16:02:03,054] INFO Session establishment complete on server 
> zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 
> 18000 (org.apache.zookeeper.ClientCnxn){code}
> The Kafka ZK client is notified of the connection.
> {code:java}
> [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> The broker sends the request to create the znode {{/brokers/ids/18}} which 
> already exists. The error path implemented for KAFKA-6584 is then followed. 
> However, in this case, the session owning the ephemeral node 
> {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
> active Zookeeper session which the broker has recorded. And it is also 
> different from the current session {{0x1006c6e0b830001}} 
> ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
> attempted.
> {code:java}
> [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '216172783240153793' does not 
> match current session '72176813933264897' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
>         at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
>         at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
>         at 
> kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
>         at 
> kafka.controller.KafkaController.process(KafkaController.scala:1853)
>         at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
> The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
> server logs:
> {code:java}
> [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
> session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
> {code}
> The ephemeral znode is then deleted and never recreated. The broker is not 
> registered. Only a broker restart (or forced recreation of the Zookeeper 
> session) can mitigate at this point.
> I fail to understand where the session {{0x300000043230ac1}} comes from. An 
> analysis of the commit logs from Zookeeper does show the following sequence 
> of transactions with timestamps from the Zookeeper node.
>  - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
>  - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
> SetData(<BrokerInfo>)]
>  - 2023-03-05T16:02:21.336Z: CloseSession [-11]
> -A code based example was attempted to reproduce the use case (available 
> [here|https://github.com/Hangleton/kafka-tools/blob/master/src/main/java/kafka/repro/BrokerRegistrationTest.java]),
>  however, it fails to reproduce the occurrence of the "phantom" intermediate 
> session. Instead, the test case forces session renewal after the Multi 
> transaction is processed for the first time. It basically tries to produce 
> the following sequence.-
> !broker-registration.drawio.png!
>  - -The ephemeral owner recorded after a successful registration is 
> {{{}0xAAAAAAAAAAAAAA{}}}.-
>  - -The session expires and a new one ({{{}0x1000c7cbe1d0000{}}}) is created.-
>  - -The broker registration callback is invoked on the controller. The Zk 
> client enqueues the {{Multi}} request.-
>  - -The response is never receives by the broker.-
>  - -The client connection times out.-
>  - -The session expires.-
>  - -Upon reconnection, a new session ({{{}0x1000c7cbe1d0001{}}}) is created.-
>  - -The Zk client retries upon receiving the {{CONNECTIONLOSS}} response.-
> -Note the session renewal right after the response from the first Multi 
> request is dropped. There is a race condition between the {{AdminZkClient}} 
> and the underlying ZK client which is explicitly controlled in this test so 
> that the {{AdminZkClient}} does not see the ZK client as disconnected.-
> The fix for KAFKA-6584 does not cover this case because here, the session ID 
> is not surfaced and recorded by the ZK client in Kafka (there was no 
> lower-level logs to ascertain if the Netty client ever received any response 
> for that session).
> As a remediation, perhaps the source of identity of the broker (currently 
> conveyed by the Zookeeper session ID) could be explicitly added to the znode 
> data (assuming the Zookeeper Multi is atomic, the znode must have the 
> BrokerInfo (or any other user data provided with the SetData command) if and 
> only if it is successfully created).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to