oneby-wang opened a new pull request, #25910:
URL: https://github.com/apache/pulsar/pull/25910

   ### Motivation
   
   `ZKSessionTest.testReacquireLeadershipAfterSessionLost` can observe unstable 
metadata session events after a ZooKeeper session expires and 
`PulsarZooKeeperClient` creates a replacement client.
   
   Failure test case1:
   ```
   java.lang.AssertionError: expected [SessionReestablished] but found 
[ConnectionLost]
        at org.testng.Assert.fail(Assert.java:111)
        at org.testng.Assert.failNotEquals(Assert.java:1590)
        at org.testng.Assert.assertEqualsImpl(Assert.java:150)
        at org.testng.Assert.assertEquals(Assert.java:132)
        at org.testng.Assert.assertEquals(Assert.java:644)
        at 
org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:230)
        at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at 
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
        at 
org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
        at 
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
        at 
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
   ```
   
   Failure test case2:
   ```
   java.util.concurrent.CompletionException: 
org.apache.pulsar.metadata.api.MetadataStoreException: 
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
        at 
java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:781)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:253)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:253)
        at 
org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
        at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
   Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: 
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
        ... 3 more
   Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
        at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
        ... 4 more
   Cause 1: org.apache.pulsar.metadata.api.MetadataStoreException: 
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired
        at 
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
        at 
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
        at 
app//org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
        at 
app//org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
        at 
app//org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
   Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired
        at 
app//org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
        at 
app//org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
        at 
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
        ... 4 more
   ```
   
   The race happens during the handoff from the expired ZooKeeper instance to 
the new one:
   
   - `ZooKeeper` can deliver `SyncConnected` while the new client is still 
being constructed.
   - `ZooKeeperWatcherBase` forwards that session event to child watchers.
   - Those child watchers can run before `PulsarZooKeeperClient` publishes the 
new `ZooKeeper` handle.
   - During that window, follow-up operations can still be routed to the old 
expired handle, and an old async session probe can later overwrite the state of 
the newly established session.
   
   This can produce extra or incomplete session transitions around 
`ConnectionLost`, `SessionLost`, `Reconnected`, and `SessionReestablished`.
   
   ### Modifications
   
   This change keeps the reconnect flow local to `PulsarZooKeeperClient` and 
`ZKSessionWatcher`.
   
   - `PulsarZooKeeperClient` now creates replacement ZooKeeper clients with a 
forwarding watcher instead of passing `watcherManager` directly.
   - The forwarding watcher waits until the new ZooKeeper handle has been 
published before forwarding events to `watcherManager`.
   - The new handle is published before releasing the forwarding watcher, and 
`waitForConnection()` runs after that release because it depends on the 
forwarded `SyncConnected` event.
   - `ZKSessionWatcher` records the session id used for its async `exists("/")` 
probe and only applies the probe result if the current session id still matches.
   - The session-id check and state transition are guarded by the same 
synchronized section so a stale probe cannot race with a new-session event and 
overwrite the new session state.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to