[ 
https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-14845:
--------------------------------------
    Attachment: phoque.png

> Broker ZNode creation can fail due to lost Zookeeper Session ID
> ---------------------------------------------------------------
>
>                 Key: KAFKA-14845
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14845
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Alexandre Dupriez
>            Assignee: Alexandre Dupriez
>            Priority: Minor
>         Attachments: kafka-broker-reg.log, phoque.png
>
>
> Our production environment faced a use case where registration of a broker 
> failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
> case is not without familiarity to that fixed by KAFKA-6584 and induced by 
> the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
> A network partition disturbed communication channels between the Kafka and 
> Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
> broker was not able to re-register with Zookeeper and was excluded from the 
> cluster until it was restarted. Broker logs show the failed registration due 
> to a "conflicting" znode write which in this case does not exactly match the 
> scenario covered by KAFKA-6584.
> The sequence of logs on the broker is as follows.
> First, a connection is established with the Zookeeper node 3.
> {code:java}
> [2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
> L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> An existing Zookeeper session was expired, and upon reconnection, the 
> Zookeeper state change handler was invoked. The creation of the ephemeral 
> znode /brokers/ids/18 started on the controller thread.
> {code:java}
> [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient){code}
> The client "session" timed out after 6 seconds. Note the session is 0x0 and 
> the absence of "{_}Session establishment complete{_}" log: the broker appears 
> to have never received or processed the response from the Zookeeper node.
> {code:java}
> [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
> server in 6000ms for sessionid 0x0, closing socket connection and attempting 
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
> L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
> started waiting on a new connection notification.
> {code:java}
> [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient){code}
> A new connection was created with the Zookeeper node 1. Note that a valid 
> (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
> {code:java}
> [2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
> L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-05 16:02:03,054] INFO Session establishment complete on server 
> zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 
> 18000 (org.apache.zookeeper.ClientCnxn){code}
> The Kafka ZK client is notified of the connection.
> {code:java}
> [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> The broker sends the request to create the znode {{/brokers/ids/18}} which 
> already exists. The error path implemented for KAFKA-6584 is then followed. 
> However, in this case, the session owning the ephemeral node 
> {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
> active Zookeeper session which the broker has recorded. And it is also 
> different from the current session {{0x1006c6e0b830001}} 
> ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
> attempted.
> {code:java}
> [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '216172783240153793' does not 
> match current session '72176813933264897' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
>         at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
>         at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
>         at 
> kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
>         at 
> kafka.controller.KafkaController.process(KafkaController.scala:1853)
>         at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
> The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
> server logs:
> {code:java}
> [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
> session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
> {code}
> The ephemeral znode is then deleted and never recreated. The broker is not 
> registered. Only a broker restart (or forced recreation of the Zookeeper 
> session) can mitigate at this point.
> An analysis of the commit logs from Zookeeper does show the following 
> sequence of transactions with timestamps from the Zookeeper node.
>  - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
>  - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
> SetData(<BrokerInfo>)]
>  - 2023-03-05T16:02:21.336Z: CloseSession [-11]
> The fix for KAFKA-6584 does not cover this case because here, the session ID 
> is not surfaced and recorded by the ZK client in Kafka (there was no 
> lower-level logs to ascertain if the Netty client ever received any response 
> for that session).
> As a remediation, perhaps the source of identity of the broker (currently 
> conveyed by the Zookeeper session ID) could be explicitly added to the znode 
> data (assuming the Zookeeper Multi is atomic, the znode must have the 
> BrokerInfo (or any other user data provided with the SetData command) if and 
> only if it is successfully created).
> ----
> *Update:* this can be reproduced with this [automated 
> test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. 
> The sequence of events produced by the test is the following.
> !broker-registration.drawio (4) .png
> 1) The Zookeeper client is created by the application. It opens a TCP 
> connection, then send a Connect request which is processed on the Netty NIO 
> thread pool. A CreateSession request is internally enqueued to be handled by 
> the synchronous request processor. Once processed, a session id is generated 
> and recorded, and that session id 0x1001764b3920000 is returned to the client.
> {code:java}
> [2023-03-29 16:22:04,258] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, 
> L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:04,310] INFO Session establishment complete on server 
> localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout 
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> 2) The Kafka client on top of the Zookeeper client is used to register the 
> broker. The multiTransaction API in the Zookeeper client is invoked and a 
> multi request is sent to Zookeeper, with a CreateNode and SetData 
> transactions.
> {code:java}
> [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 
> reqpath:n/a
> [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 
> is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 
> (kafka.zk.KafkaZkClient)
> [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 
> with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 
> (kafka.zk.KafkaZkClient){code}
> 3) The client generates Ping request every 6 seconds (read timeout / 2). The 
> response to these pings are not sent back to the client (enforced by the 
> test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), 
> the client initiates a new connection.
> {code:java}
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe 
> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
> Dropping -2,3265,0
> [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from 
> server in 12001ms for session id 0x1001764b3920000 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server 
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
> except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, 
> L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> 4) A new TCP connection is opened and the Connect request sent. A delay is 
> artificially injected to make the connection request time out (connection 
> timeout, which is equal to the session timeout in this case).
> {code:java}
> [2023-03-29 16:22:18,388] INFO Opening socket connection to server 
> localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,395] INFO Socket connection established, initiating 
> session, client: /[0:0:0:0:0:0:0:1]:50121, server: 
> localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, 
> L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> >>>> CONNECTION DELAY = 18500 ms{code}
> 5) After 18 seconds (which is both the session and connection timeout), the 
> server expires the session and an internal closeSession is generated on the 
> server and enqueued to be processed by the synchronous request processor.
> {code:java}
> [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 
> reqpath:n/a
> [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from 
> server in 18005ms for session id 0x1001764b3920000 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server 
> localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting 
> reconnect except it is a SessionExpiredException. 
> (org.apache.zookeeper.ClientCnxn){code}
> 6) In parallel, the client detects the connection timeout and reconnects. A 
> new TCP connection is opened, the client is notified by the server that the 
> session 0x10016b7f51c0000 has expired.
> {code:java}
> [2023-03-29 16:22:37,972] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, 
> L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client 
> /127.0.0.1:50131, probably expired 
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, 
> session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code}
> 7) The client closes its "handle" (~client in Zookeeper terminology) with 
> Zookeeper.
> {code:java}
> [2023-03-29 16:22:37,994] INFO EventThread shut down for session: 
> 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
> [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new 
> session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code}
> 8) The client creates a new handle which initiates a new session with 
> Zookeeper. A new TCP connection is opened, and the createSession is 
> propagated to the synchronous request processor. The session is created by 
> the server, but the response never sent back to the client.
> {code:java}
> [2023-03-29 16:22:37,998] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=18000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e 
> (org.apache.zookeeper.ZooKeeper)
> [2023-03-29 16:22:37,999] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,005] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, 
> L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 
> zxid:0xcc4 txntype:-10 reqpath:n/a
> Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code}
> 9) An inter-thread signal triggered during the creation of the second session 
> on the synchronous request processor is captured by the main (application) 
> thread which starts a broker registration. The client immediately sends the 
> multi request to the server using the opened TCP connection even before 
> receiving confirmation of the successful processing of the ConnectRequest, 
> and without knowing the id of the current session. The server processes the 
> multi request and creates the znode /brokers/ids/18 using the session 
> 0x1001764b3920001. The response is not sent to the client, which is not made 
> aware the request has been processed.
> {code:java}
> [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 
> txntype:14 reqpath:n/a
> Dropping 1,3269,0
>  org.apache.zookeeper.MultiResponse@99bc5055{code}
> 10) The connection times out again, since no response is sent to the client 
> within 18 seconds. The Zookeeper client library returns a CONNLOSS response 
> to the multi request. The Zookeeper client built on top of it in Kafka 
> retries on this type of error. So, it retries, waiting for a new connection 
> to be established.
> {code:java}
> [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from 
> server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:56,014] WARN Session 0x0 for server 
> localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect 
> except it is a SessionExpiredException. 
> (org.apache.zookeeper.ClientCnxn){code}
> 11) A new connection attempt is established. This time, we allow it to go 
> through.
> {code:java}
> [2023-03-29 16:22:57,490] INFO Opening socket connection to server 
> localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,494] INFO Socket connection established, initiating 
> session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, 
> L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 
> txntype:-10 reqpath:n/a
> [2023-03-29 16:22:57,522] INFO Session establishment complete on server 
> localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout 
> = 18000 (org.apache.zookeeper.ClientCnxn)
> [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> 12) The multi request is retried and the response NODEEXISTS is received. 
> Kafka then sends a getData to Zookeeper to find the ephemeral owner of the 
> znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which 
> matches neither the current session 0x1001764b3920002 (72083315314786306) nor 
> the previously recorded one in the Kafka client (0x1001764b3920000) during 
> the first znode creation.
> {code:java}
> sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 
> reqpath:n/a
> sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe 
> txntype:unknown reqpath:/brokers/ids/18
> [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '72083315314786305' does not 
> match current session '72083315314786306' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>     at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>     at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185)
>     at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123)
>     at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090)
>     at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102)
>     at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172)
> {code}
> Note that on the diagram, we introduce a delay in the expiration of the 
> session 0x1001764b3920001. The expiration of the session, and deletion of its 
> associated ephemeral nodes, is scheduled on the synchronous processor 
> asynchronously of incoming requests, such that it is possible for the multi 
> request to come and be consumed by the synchronous processor before the 
> closeSession request for 0x1001764b3920001. The ephemeral znode for that 
> session is therefore still present in Zookeeper's data tree when the new 
> multi request is processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to