This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7a9b5c7233d8ba7f85b6e45bc93168e8615e8d79 Author: zzb <[email protected]> AuthorDate: Fri May 15 17:07:17 2026 +0800 [fix][broker] Fix PulsarService.closeAsync where Condition.signalAll was called without holding a lock (#25777) Co-authored-by: zhaizhibo <[email protected]> (cherry picked from commit a20008899b7a89171ae44d40893eb353bb78ab70) --- .../org/apache/pulsar/broker/PulsarService.java | 9 +++++-- .../pulsar/broker/PulsarServiceCloseTest.java | 28 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d643ed8b6fe..a77e09d79ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -747,8 +747,13 @@ public class PulsarService implements AutoCloseable, ShutdownService { } else { LOG.warn("Closed with errors", t); } - state = State.Closed; - isClosedCondition.signalAll(); + mutex.lock(); + try { + state = State.Closed; + isClosedCondition.signalAll(); + } finally { + mutex.unlock(); + } return null; }); return closeFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java index 683e15c2a02..8b3b8ad9110 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.broker; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -73,4 +76,29 @@ public class PulsarServiceCloseTest extends MockedPulsarServiceBaseTest { } } + @Test(timeOut = 60_000) + public void testWaitUntilClosedConcurrentWithCloseAsync() throws Exception { + // Start closeAsync() - it initiates close and returns a future + CompletableFuture<Void> closeFuture = pulsar.closeAsync(); + + // Start waitUntilClosed() in a separate thread BEFORE close completes. + // This thread will enter mutex.lock() -> await() and block there, + // relying on signalAll() to be woken up when close finishes. + CompletableFuture<Void> waitFuture = CompletableFuture.runAsync(() -> { + try { + pulsar.waitUntilClosed(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + + try { + closeFuture.get(30, TimeUnit.SECONDS); + waitFuture.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Should not throw exception"); + } + log.info("waitUntilClosed() returned successfully while closeAsync() was in progress"); + } }
