This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a20008899b7 [fix][broker] Fix PulsarService.closeAsync where
Condition.signalAll was called without holding a lock (#25777)
a20008899b7 is described below
commit a20008899b7a89171ae44d40893eb353bb78ab70
Author: zzb <[email protected]>
AuthorDate: Fri May 15 17:07:17 2026 +0800
[fix][broker] Fix PulsarService.closeAsync where Condition.signalAll was
called without holding a lock (#25777)
Co-authored-by: zhaizhibo <[email protected]>
---
.../org/apache/pulsar/broker/PulsarService.java | 9 +++++--
.../pulsar/broker/PulsarServiceCloseTest.java | 28 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 85bd9f53b1d..4fe6196c05c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -754,8 +754,13 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
} else {
log.warn().exception(t).log("Closed with errors");
}
- state = State.Closed;
- isClosedCondition.signalAll();
+ mutex.lock();
+ try {
+ state = State.Closed;
+ isClosedCondition.signalAll();
+ } finally {
+ mutex.unlock();
+ }
return null;
});
return closeFuture;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
index 3d6509baa8a..a75d54ad13e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.broker;
+import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -73,4 +76,29 @@ public class PulsarServiceCloseTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test(timeOut = 60_000)
+ public void testWaitUntilClosedConcurrentWithCloseAsync() throws Exception
{
+ // Start closeAsync() - it initiates close and returns a future
+ CompletableFuture<Void> closeFuture = pulsar.closeAsync();
+
+ // Start waitUntilClosed() in a separate thread BEFORE close completes.
+ // This thread will enter mutex.lock() -> await() and block there,
+ // relying on signalAll() to be woken up when close finishes.
+ CompletableFuture<Void> waitFuture = CompletableFuture.runAsync(() -> {
+ try {
+ pulsar.waitUntilClosed();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ });
+
+ try {
+ closeFuture.get(30, TimeUnit.SECONDS);
+ waitFuture.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ fail("Should not throw exception");
+ }
+ log.info("waitUntilClosed() returned successfully while closeAsync()
was in progress");
+ }
}