[ https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexandre Dupriez updated KAFKA-14845: -------------------------------------- Description: Our production environment faced a use case where registration of a broker failed due to the presence of a "conflicting" broker znode in Zookeeper. This case is not without familiarity to that fixed by KAFKA-6584 and induced by the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today. A network partition disturbed communication channels between the Kafka and Zookeeper clusters for about 20% of the brokers in the cluster. One of this broker was not able to re-register with Zookeeper and was excluded from the cluster until it was restarted. Broker logs show the failed registration due to a "conflicting" znode write which in this case does not exactly match the scenario covered by KAFKA-6584. The sequence of logs on the broker is as follows. First, a connection is established with the Zookeeper node 3. {code:java} [2023-03-05 16:01:55,342] INFO Socket connection established, initiating session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] (org.apache.zookeeper.ClientCnxnSocketNetty){code} An existing Zookeeper session was expired, and upon reconnection, the Zookeeper state change handler was invoked. The creation of the ephemeral znode /brokers/ids/18 started on the controller thread. {code:java} [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient){code} The client "session" timed out after 6 seconds. Note the session is 0x0 and the absence of "{_}Session establishment complete{_}" log: the broker appears to have never received or processed the response from the Zookeeper node. {code:java} [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from server in 6000ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] (org.apache.zookeeper.ClientCnxnSocketNetty){code} Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client started waiting on a new connection notification. {code:java} [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient){code} A new connection was created with the Zookeeper node 1. Note that a valid (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time. {code:java} [2023-03-05 16:02:02,037] INFO Socket connection established, initiating session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] (org.apache.zookeeper.ClientCnxnSocketNetty) [2023-03-05 16:02:03,054] INFO Session establishment complete on server zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn){code} The Kafka ZK client is notified of the connection. {code:java} [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient){code} The broker sends the request to create the znode {{/brokers/ids/18}} which already exists. The error path implemented for KAFKA-6584 is then followed. However, in this case, the session owning the ephemeral node {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last active Zookeeper session which the broker has recorded. And it is also different from the current session {{0x1006c6e0b830001}} ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not attempted. {code:java} [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at /brokers/ids/18, node already exists and owner '216172783240153793' does not match current session '72176813933264897' (kafka.zk.KafkaZkClient$CheckedEphemeral) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821) at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726) at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95) at kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810) at kafka.controller.KafkaController.process(KafkaController.scala:1853) at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51) at kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127) at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper server logs: {code:java} [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer) {code} The ephemeral znode is then deleted and never recreated. The broker is not registered. Only a broker restart (or forced recreation of the Zookeeper session) can mitigate at this point. An analysis of the commit logs from Zookeeper does show the following sequence of transactions with timestamps from the Zookeeper node. - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), SetData(<BrokerInfo>)] - 2023-03-05T16:02:21.336Z: CloseSession [-11] The fix for KAFKA-6584 does not cover this case because here, the session ID is not surfaced and recorded by the ZK client in Kafka (there was no lower-level logs to ascertain if the Netty client ever received any response for that session). As a remediation, perhaps the source of identity of the broker (currently conveyed by the Zookeeper session ID) could be explicitly added to the znode data (assuming the Zookeeper Multi is atomic, the znode must have the BrokerInfo (or any other user data provided with the SetData command) if and only if it is successfully created). ---- *Update:* this can be reproduced with this [automated test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. The sequence of events produced by the test is the following. !phoque.png! 1) The Zookeeper client is created by the application. It opens a TCP connection, then send a Connect request which is processed on the Netty NIO thread pool. A CreateSession request is internally enqueued to be handled by the synchronous request processor. Once processed, a session id is generated and recorded, and that session id 0x1001764b3920000 is returned to the client. {code:java} [2023-03-29 16:22:04,258] INFO Socket connection established, initiating session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10 reqpath:n/a [2023-03-29 16:22:04,310] INFO Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. (kafka.zookeeper.ZooKeeperClient){code} 2) The Kafka client on top of the Zookeeper client is used to register the broker. The multiTransaction API in the Zookeeper client is invoked and a multi request is sent to Zookeeper, with a CreateNode and SetData transactions. {code:java} [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient) sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 reqpath:n/a [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 (kafka.zk.KafkaZkClient) [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 (kafka.zk.KafkaZkClient){code} 3) The client generates Ping request every 6 seconds (read timeout / 2). The response to these pings are not sent back to the client (enforced by the test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the client initiates a new connection. {code:java} [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Dropping -2,3265,0 [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Dropping -2,3265,0 [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from server in 12001ms for session id 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty){code} 4) A new TCP connection is opened and the Connect request sent. A delay is artificially injected to make the connection request time out (connection timeout, which is equal to the session timeout in this case). {code:java} [2023-03-29 16:22:18,388] INFO Opening socket connection to server localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:18,395] INFO Socket connection established, initiating session, client: /[0:0:0:0:0:0:0:1]:50121, server: localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) >>>> CONNECTION DELAY = 18500 ms{code} 5) After 18 seconds (which is both the session and connection timeout), the server expires the session and an internal closeSession is generated on the server and enqueued to be processed by the synchronous request processor. {code:java} [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 reqpath:n/a [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from server in 18005ms for session id 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code} 6) In parallel, the client detects the connection timeout and reconnects. A new TCP connection is opened, the client is notified by the server that the session 0x10016b7f51c0000 has expired. {code:java} [2023-03-29 16:22:37,972] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,976] INFO Socket connection established, initiating session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client /127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer) [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code} 7) The client closes its "handle" (~client in Zookeeper terminology) with Zookeeper. {code:java} [2023-03-29 16:22:37,994] INFO EventThread shut down for session: 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. (kafka.zookeeper.ZooKeeperClient) [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code} 8) The client creates a new handle which initiates a new session with Zookeeper. A new TCP connection is opened, and the createSession is propagated to the synchronous request processor. The session is created by the server, but the response never sent back to the client. {code:java} [2023-03-29 16:22:37,998] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e (org.apache.zookeeper.ZooKeeper) [2023-03-29 16:22:37,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:38,005] INFO Socket connection established, initiating session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4 txntype:-10 reqpath:n/a Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code} 9) An inter-thread signal triggered during the creation of the second session on the synchronous request processor is captured by the main (application) thread which starts a broker registration. The client immediately sends the multi request to the server using the opened TCP connection even before receiving confirmation of the successful processing of the ConnectRequest, and without knowing the id of the current session. The server processes the multi request and creates the znode /brokers/ids/18 using the session 0x1001764b3920001. The response is not sent to the client, which is not made aware the request has been processed. {code:java} [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient) [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 txntype:14 reqpath:n/a Dropping 1,3269,0 org.apache.zookeeper.MultiResponse@99bc5055{code} 10) The connection times out again, since no response is sent to the client within 18 seconds. The Zookeeper client library returns a CONNLOSS response to the multi request. The Zookeeper client built on top of it in Kafka retries on this type of error. So, it retries, waiting for a new connection to be established. {code:java} [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code} 11) A new connection attempt is established. This time, we allow it to go through. {code:java} [2023-03-29 16:22:57,490] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,494] INFO Socket connection established, initiating session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10 reqpath:n/a [2023-03-29 16:22:57,522] INFO Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. (kafka.zookeeper.ZooKeeperClient){code} 12) The multi request is retried and the response NODEEXISTS is received. Kafka then sends a getData to Zookeeper to find the ephemeral owner of the znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither the current session 0x1001764b3920002 (72083315314786306) nor the previously recorded one in the Kafka client (0x1001764b3920000) during the first znode creation. {code:java} sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 reqpath:n/a sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:/brokers/ids/18 [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at /brokers/ids/18, node already exists and owner '72083315314786305' does not match current session '72083315314786306' (kafka.zk.KafkaZkClient$CheckedEphemeral) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185) at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090) at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102) at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172) {code} Note that on the diagram, we introduce a delay in the expiration of the session 0x1001764b3920001. The expiration of the session, and deletion of its associated ephemeral nodes, is scheduled on the synchronous processor asynchronously of incoming requests, such that it is possible for the multi request to come and be consumed by the synchronous processor before the closeSession request for 0x1001764b3920001. The ephemeral znode for that session is therefore still present in Zookeeper's data tree when the new multi request is processed. was: Our production environment faced a use case where registration of a broker failed due to the presence of a "conflicting" broker znode in Zookeeper. This case is not without familiarity to that fixed by KAFKA-6584 and induced by the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today. A network partition disturbed communication channels between the Kafka and Zookeeper clusters for about 20% of the brokers in the cluster. One of this broker was not able to re-register with Zookeeper and was excluded from the cluster until it was restarted. Broker logs show the failed registration due to a "conflicting" znode write which in this case does not exactly match the scenario covered by KAFKA-6584. The sequence of logs on the broker is as follows. First, a connection is established with the Zookeeper node 3. {code:java} [2023-03-05 16:01:55,342] INFO Socket connection established, initiating session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] (org.apache.zookeeper.ClientCnxnSocketNetty){code} An existing Zookeeper session was expired, and upon reconnection, the Zookeeper state change handler was invoked. The creation of the ephemeral znode /brokers/ids/18 started on the controller thread. {code:java} [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient){code} The client "session" timed out after 6 seconds. Note the session is 0x0 and the absence of "{_}Session establishment complete{_}" log: the broker appears to have never received or processed the response from the Zookeeper node. {code:java} [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from server in 6000ms for sessionid 0x0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] (org.apache.zookeeper.ClientCnxnSocketNetty){code} Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client started waiting on a new connection notification. {code:java} [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient){code} A new connection was created with the Zookeeper node 1. Note that a valid (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time. {code:java} [2023-03-05 16:02:02,037] INFO Socket connection established, initiating session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 (org.apache.zookeeper.ClientCnxn) [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] (org.apache.zookeeper.ClientCnxnSocketNetty) [2023-03-05 16:02:03,054] INFO Session establishment complete on server zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn){code} The Kafka ZK client is notified of the connection. {code:java} [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient){code} The broker sends the request to create the znode {{/brokers/ids/18}} which already exists. The error path implemented for KAFKA-6584 is then followed. However, in this case, the session owning the ephemeral node {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last active Zookeeper session which the broker has recorded. And it is also different from the current session {{0x1006c6e0b830001}} ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not attempted. {code:java} [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at /brokers/ids/18, node already exists and owner '216172783240153793' does not match current session '72176813933264897' (kafka.zk.KafkaZkClient$CheckedEphemeral) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821) at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726) at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95) at kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810) at kafka.controller.KafkaController.process(KafkaController.scala:1853) at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51) at kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127) at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper server logs: {code:java} [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer) {code} The ephemeral znode is then deleted and never recreated. The broker is not registered. Only a broker restart (or forced recreation of the Zookeeper session) can mitigate at this point. An analysis of the commit logs from Zookeeper does show the following sequence of transactions with timestamps from the Zookeeper node. - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1 - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), SetData(<BrokerInfo>)] - 2023-03-05T16:02:21.336Z: CloseSession [-11] The fix for KAFKA-6584 does not cover this case because here, the session ID is not surfaced and recorded by the ZK client in Kafka (there was no lower-level logs to ascertain if the Netty client ever received any response for that session). As a remediation, perhaps the source of identity of the broker (currently conveyed by the Zookeeper session ID) could be explicitly added to the znode data (assuming the Zookeeper Multi is atomic, the znode must have the BrokerInfo (or any other user data provided with the SetData command) if and only if it is successfully created). ---- *Update:* this can be reproduced with this [automated test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. The sequence of events produced by the test is the following. !phoque.png 1) The Zookeeper client is created by the application. It opens a TCP connection, then send a Connect request which is processed on the Netty NIO thread pool. A CreateSession request is internally enqueued to be handled by the synchronous request processor. Once processed, a session id is generated and recorded, and that session id 0x1001764b3920000 is returned to the client. {code:java} [2023-03-29 16:22:04,258] INFO Socket connection established, initiating session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 txntype:-10 reqpath:n/a [2023-03-29 16:22:04,310] INFO Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. (kafka.zookeeper.ZooKeeperClient){code} 2) The Kafka client on top of the Zookeeper client is used to register the broker. The multiTransaction API in the Zookeeper client is invoked and a multi request is sent to Zookeeper, with a CreateNode and SetData transactions. {code:java} [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient) sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 reqpath:n/a [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 (kafka.zk.KafkaZkClient) [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 (kafka.zk.KafkaZkClient){code} 3) The client generates Ping request every 6 seconds (read timeout / 2). The response to these pings are not sent back to the client (enforced by the test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), the client initiates a new connection. {code:java} [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Dropping -2,3265,0 [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Dropping -2,3265,0 [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from server in 12001ms for session id 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty){code} 4) A new TCP connection is opened and the Connect request sent. A delay is artificially injected to make the connection request time out (connection timeout, which is equal to the session timeout in this case). {code:java} [2023-03-29 16:22:18,388] INFO Opening socket connection to server localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:18,395] INFO Socket connection established, initiating session, client: /[0:0:0:0:0:0:0:1]:50121, server: localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) >>>> CONNECTION DELAY = 18500 ms{code} 5) After 18 seconds (which is both the session and connection timeout), the server expires the session and an internal closeSession is generated on the server and enqueued to be processed by the synchronous request processor. {code:java} [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 reqpath:n/a [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from server in 18005ms for session id 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code} 6) In parallel, the client detects the connection timeout and reconnects. A new TCP connection is opened, the client is notified by the server that the session 0x10016b7f51c0000 has expired. {code:java} [2023-03-29 16:22:37,972] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,976] INFO Socket connection established, initiating session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client /127.0.0.1:50131, probably expired (org.apache.zookeeper.server.ZooKeeperServer) [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code} 7) The client closes its "handle" (~client in Zookeeper terminology) with Zookeeper. {code:java} [2023-03-29 16:22:37,994] INFO EventThread shut down for session: 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. (kafka.zookeeper.ZooKeeperClient) [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code} 8) The client creates a new handle which initiates a new session with Zookeeper. A new TCP connection is opened, and the createSession is propagated to the synchronous request processor. The session is created by the server, but the response never sent back to the client. {code:java} [2023-03-29 16:22:37,998] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e (org.apache.zookeeper.ZooKeeper) [2023-03-29 16:22:37,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:38,005] INFO Socket connection established, initiating session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 zxid:0xcc4 txntype:-10 reqpath:n/a Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code} 9) An inter-thread signal triggered during the creation of the second session on the synchronous request processor is captured by the main (application) thread which starts a broker registration. The client immediately sends the multi request to the server using the opened TCP connection even before receiving confirmation of the successful processing of the ConnectRequest, and without knowing the id of the current session. The server processes the multi request and creates the znode /brokers/ids/18 using the session 0x1001764b3920001. The response is not sent to the client, which is not made aware the request has been processed. {code:java} [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) (kafka.zk.KafkaZkClient) [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 txntype:14 reqpath:n/a Dropping 1,3269,0 org.apache.zookeeper.MultiResponse@99bc5055{code} 10) The connection times out again, since no response is sent to the client within 18 seconds. The Zookeeper client library returns a CONNLOSS response to the multi request. The Zookeeper client built on top of it in Kafka retries on this type of error. So, it retries, waiting for a new connection to be established. {code:java} [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:56,014] WARN Session 0x0 for server localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn){code} 11) A new connection attempt is established. This time, we allow it to go through. {code:java} [2023-03-29 16:22:57,490] INFO Opening socket connection to server localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,494] INFO Socket connection established, initiating session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] (org.apache.zookeeper.ClientCnxnSocketNetty) sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 txntype:-10 reqpath:n/a [2023-03-29 16:22:57,522] INFO Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn) [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. (kafka.zookeeper.ZooKeeperClient){code} 12) The multi request is retried and the response NODEEXISTS is received. Kafka then sends a getData to Zookeeper to find the ephemeral owner of the znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which matches neither the current session 0x1001764b3920002 (72083315314786306) nor the previously recorded one in the Kafka client (0x1001764b3920000) during the first znode creation. {code:java} sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 reqpath:n/a sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:/brokers/ids/18 [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at /brokers/ids/18, node already exists and owner '72083315314786305' does not match current session '72083315314786306' (kafka.zk.KafkaZkClient$CheckedEphemeral) org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185) at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123) at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090) at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102) at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172) {code} Note that on the diagram, we introduce a delay in the expiration of the session 0x1001764b3920001. The expiration of the session, and deletion of its associated ephemeral nodes, is scheduled on the synchronous processor asynchronously of incoming requests, such that it is possible for the multi request to come and be consumed by the synchronous processor before the closeSession request for 0x1001764b3920001. The ephemeral znode for that session is therefore still present in Zookeeper's data tree when the new multi request is processed. > Broker ZNode creation can fail due to lost Zookeeper Session ID > --------------------------------------------------------------- > > Key: KAFKA-14845 > URL: https://issues.apache.org/jira/browse/KAFKA-14845 > Project: Kafka > Issue Type: Bug > Reporter: Alexandre Dupriez > Assignee: Alexandre Dupriez > Priority: Minor > Attachments: kafka-broker-reg.log, phoque.png > > > Our production environment faced a use case where registration of a broker > failed due to the presence of a "conflicting" broker znode in Zookeeper. This > case is not without familiarity to that fixed by KAFKA-6584 and induced by > the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today. > A network partition disturbed communication channels between the Kafka and > Zookeeper clusters for about 20% of the brokers in the cluster. One of this > broker was not able to re-register with Zookeeper and was excluded from the > cluster until it was restarted. Broker logs show the failed registration due > to a "conflicting" znode write which in this case does not exactly match the > scenario covered by KAFKA-6584. > The sequence of logs on the broker is as follows. > First, a connection is established with the Zookeeper node 3. > {code:java} > [2023-03-05 16:01:55,342] INFO Socket connection established, initiating > session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 > (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, > L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > An existing Zookeeper session was expired, and upon reconnection, the > Zookeeper state change handler was invoked. The creation of the ephemeral > znode /brokers/ids/18 started on the controller thread. > {code:java} > [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient){code} > The client "session" timed out after 6 seconds. Note the session is 0x0 and > the absence of "{_}Session establishment complete{_}" log: the broker appears > to have never received or processed the response from the Zookeeper node. > {code:java} > [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from > server in 6000ms for sessionid 0x0, closing socket connection and attempting > reconnect (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, > L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client > started waiting on a new connection notification. > {code:java} > [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient){code} > A new connection was created with the Zookeeper node 1. Note that a valid > (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time. > {code:java} > [2023-03-05 16:02:02,037] INFO Socket connection established, initiating > session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 > (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, > L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [2023-03-05 16:02:03,054] INFO Session establishment complete on server > zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = > 18000 (org.apache.zookeeper.ClientCnxn){code} > The Kafka ZK client is notified of the connection. > {code:java} > [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > The broker sends the request to create the znode {{/brokers/ids/18}} which > already exists. The error path implemented for KAFKA-6584 is then followed. > However, in this case, the session owning the ephemeral node > {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last > active Zookeeper session which the broker has recorded. And it is also > different from the current session {{0x1006c6e0b830001}} > ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not > attempted. > {code:java} > [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at > /brokers/ids/18, node already exists and owner '216172783240153793' does not > match current session '72176813933264897' > (kafka.zk.KafkaZkClient$CheckedEphemeral) > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = > NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95) > at > kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810) > at > kafka.controller.KafkaController.process(KafkaController.scala:1853) > at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51) > at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127) > at > kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130) > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper > server logs: > {code:java} > [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of > 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) > [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for > session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer) > {code} > The ephemeral znode is then deleted and never recreated. The broker is not > registered. Only a broker restart (or forced recreation of the Zookeeper > session) can mitigate at this point. > An analysis of the commit logs from Zookeeper does show the following > sequence of transactions with timestamps from the Zookeeper node. > - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1 > - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), > SetData(<BrokerInfo>)] > - 2023-03-05T16:02:21.336Z: CloseSession [-11] > The fix for KAFKA-6584 does not cover this case because here, the session ID > is not surfaced and recorded by the ZK client in Kafka (there was no > lower-level logs to ascertain if the Netty client ever received any response > for that session). > As a remediation, perhaps the source of identity of the broker (currently > conveyed by the Zookeeper session ID) could be explicitly added to the znode > data (assuming the Zookeeper Multi is atomic, the znode must have the > BrokerInfo (or any other user data provided with the SetData command) if and > only if it is successfully created). > ---- > *Update:* this can be reproduced with this [automated > test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. > The sequence of events produced by the test is the following. > !phoque.png! > 1) The Zookeeper client is created by the application. It opens a TCP > connection, then send a Connect request which is processed on the Netty NIO > thread pool. A CreateSession request is internally enqueued to be handled by > the synchronous request processor. Once processed, a session id is generated > and recorded, and that session id 0x1001764b3920000 is returned to the client. > {code:java} > [2023-03-29 16:22:04,258] INFO Socket connection established, initiating > session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, > L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 > txntype:-10 reqpath:n/a > [2023-03-29 16:22:04,310] INFO Session establishment complete on server > localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout > = 18000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > 2) The Kafka client on top of the Zookeeper client is used to register the > broker. The multiTransaction API in the Zookeeper client is invoked and a > multi request is sent to Zookeeper, with a CreateNode and SetData > transactions. > {code:java} > [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient) > sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 > reqpath:n/a > [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 > is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 > (kafka.zk.KafkaZkClient) > [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 > with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 > (kafka.zk.KafkaZkClient){code} > 3) The client generates Ping request every 6 seconds (read timeout / 2). The > response to these pings are not sent back to the client (enforced by the > test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), > the client initiates a new connection. > {code:java} > [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe > zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a > Dropping -2,3265,0 > [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe > zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a > Dropping -2,3265,0 > [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from > server in 12001ms for session id 0x1001764b3920000 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server > localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect > except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, > L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > 4) A new TCP connection is opened and the Connect request sent. A delay is > artificially injected to make the connection request time out (connection > timeout, which is equal to the session timeout in this case). > {code:java} > [2023-03-29 16:22:18,388] INFO Opening socket connection to server > localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:18,395] INFO Socket connection established, initiating > session, client: /[0:0:0:0:0:0:0:1]:50121, server: > localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, > L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > >>>> CONNECTION DELAY = 18500 ms{code} > 5) After 18 seconds (which is both the session and connection timeout), the > server expires the session and an internal closeSession is generated on the > server and enqueued to be processed by the synchronous request processor. > {code:java} > [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of > 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) > sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 > reqpath:n/a > [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from > server in 18005ms for session id 0x1001764b3920000 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server > localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting > reconnect except it is a SessionExpiredException. > (org.apache.zookeeper.ClientCnxn){code} > 6) In parallel, the client detects the connection timeout and reconnects. A > new TCP connection is opened, the client is notified by the server that the > session 0x10016b7f51c0000 has expired. > {code:java} > [2023-03-29 16:22:37,972] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,976] INFO Socket connection established, initiating > session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, > L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client > /127.0.0.1:50131, probably expired > (org.apache.zookeeper.server.ZooKeeperServer) > [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, > session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code} > 7) The client closes its "handle" (~client in Zookeeper terminology) with > Zookeeper. > {code:java} > [2023-03-29 16:22:37,994] INFO EventThread shut down for session: > 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. > (kafka.zookeeper.ZooKeeperClient) > [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new > session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code} > 8) The client creates a new handle which initiates a new session with > Zookeeper. A new TCP connection is opened, and the createSession is > propagated to the synchronous request processor. The session is created by > the server, but the response never sent back to the client. > {code:java} > [2023-03-29 16:22:37,998] INFO Initiating client connection, > connectString=localhost:2181 sessionTimeout=18000 > watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e > (org.apache.zookeeper.ZooKeeper) > [2023-03-29 16:22:37,999] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:38,005] INFO Socket connection established, initiating > session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, > L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 > zxid:0xcc4 txntype:-10 reqpath:n/a > Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code} > 9) An inter-thread signal triggered during the creation of the second session > on the synchronous request processor is captured by the main (application) > thread which starts a broker registration. The client immediately sends the > multi request to the server using the opened TCP connection even before > receiving confirmation of the successful processing of the ConnectRequest, > and without knowing the id of the current session. The server processes the > multi request and creates the znode /brokers/ids/18 using the session > 0x1001764b3920001. The response is not sent to the client, which is not made > aware the request has been processed. > {code:java} > [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient) > [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 > txntype:14 reqpath:n/a > Dropping 1,3269,0 > org.apache.zookeeper.MultiResponse@99bc5055{code} > 10) The connection times out again, since no response is sent to the client > within 18 seconds. The Zookeeper client library returns a CONNLOSS response > to the multi request. The Zookeeper client built on top of it in Kafka > retries on this type of error. So, it retries, waiting for a new connection > to be established. > {code:java} > [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from > server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:56,014] WARN Session 0x0 for server > localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect > except it is a SessionExpiredException. > (org.apache.zookeeper.ClientCnxn){code} > 11) A new connection attempt is established. This time, we allow it to go > through. > {code:java} > [2023-03-29 16:22:57,490] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,494] INFO Socket connection established, initiating > session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, > L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 > txntype:-10 reqpath:n/a > [2023-03-29 16:22:57,522] INFO Session establishment complete on server > localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout > = 18000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > 12) The multi request is retried and the response NODEEXISTS is received. > Kafka then sends a getData to Zookeeper to find the ephemeral owner of the > znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which > matches neither the current session 0x1001764b3920002 (72083315314786306) nor > the previously recorded one in the Kafka client (0x1001764b3920000) during > the first znode creation. > {code:java} > sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 > reqpath:n/a > sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe > txntype:unknown reqpath:/brokers/ids/18 > [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at > /brokers/ids/18, node already exists and owner '72083315314786305' does not > match current session '72083315314786306' > (kafka.zk.KafkaZkClient$CheckedEphemeral) > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = > NodeExists > at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123) > at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102) > at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172) > {code} > Note that on the diagram, we introduce a delay in the expiration of the > session 0x1001764b3920001. The expiration of the session, and deletion of its > associated ephemeral nodes, is scheduled on the synchronous processor > asynchronously of incoming requests, such that it is possible for the multi > request to come and be consumed by the synchronous processor before the > closeSession request for 0x1001764b3920001. The ephemeral znode for that > session is therefore still present in Zookeeper's data tree when the new > multi request is processed. -- This message was sent by Atlassian Jira (v8.20.10#820010)