This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf1cb2e8513 [fix][test] Fix flaky compaction disable test by 
controlling delete concurrency (#25794)
cf1cb2e8513 is described below

commit cf1cb2e8513573dfaca5327fffe23fad77301229
Author: void-ptr974 <[email protected]>
AuthorDate: Wed May 20 16:33:22 2026 +0800

    [fix][test] Fix flaky compaction disable test by controlling delete 
concurrency (#25794)
---
 .../persistent/CompactionConcurrencyTest.java      | 53 ++++++++++++++--------
 1 file changed, 35 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
index b4f8a0807c0..d42f65d4c7b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
@@ -19,13 +19,16 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.compaction.Compactor;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
@@ -56,27 +59,41 @@ public class CompactionConcurrencyTest extends 
SharedPulsarBaseTest {
         producer.newMessage().key("k0").value("v0").send();
         triggerCompactionAndWait(topicName);
         admin.topics().deleteSubscription(topicName, "s1");
+        PersistentTopic persistentTopic =
+                (PersistentTopic) getTopic(topicName, false).get().get();
 
         // Disable compaction.
         admin.topicPolicies().removeCompactionThreshold(topicName);
-        AtomicReference<CompletableFuture<Void>> f1 = new 
AtomicReference<CompletableFuture<Void>>();
-        AtomicReference<CompletableFuture<Void>> f2 = new 
AtomicReference<CompletableFuture<Void>>();
-        new Thread(() -> {
-            f1.set(admin.topics().deleteSubscriptionAsync(topicName, 
"__compaction"));
-        }).start();
-        new Thread(() -> {
-            f2.set(admin.topics().deleteSubscriptionAsync(topicName, 
"__compaction"));
-        }).start();
+        CompletableFuture<Long> originalCompaction = 
persistentTopic.currentCompaction;
+        CompletableFuture<Long> blockedCompaction = new CompletableFuture<>();
+        persistentTopic.currentCompaction = blockedCompaction;
+        try {
+            CompletableFuture<Void> firstDelete =
+                    admin.topics().deleteSubscriptionAsync(topicName, 
Compactor.COMPACTION_SUBSCRIPTION);
+            Awaitility.await().untilAsserted(() -> 
assertTrue(persistentTopic.disablingCompaction.get()));
 
-        // Verify: at least one of the requests should fail (the other may 
succeed or also fail
-        // with "not found" if the in-memory metadata store processes them 
sequentially).
-        Awaitility.await().untilAsserted(() -> {
-            assertTrue(f1.get() != null);
-            assertTrue(f2.get() != null);
-            assertTrue(f1.get().isDone());
-            assertTrue(f2.get().isDone());
-            assertTrue(f1.get().isCompletedExceptionally() || 
f2.get().isCompletedExceptionally());
-        });
+            CompletableFuture<Void> secondDelete =
+                    admin.topics().deleteSubscriptionAsync(topicName, 
Compactor.COMPACTION_SUBSCRIPTION);
+            Awaitility.await().untilAsserted(() -> 
assertTrue(secondDelete.isCompletedExceptionally()));
+            try {
+                secondDelete.join();
+                fail("The second concurrent compaction subscription delete 
should fail");
+            } catch (Exception ex) {
+                Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+                assertTrue(actEx instanceof 
PulsarAdminException.PreconditionFailedException);
+            }
+
+            blockedCompaction.complete(0L);
+            Awaitility.await().untilAsserted(() -> {
+                assertTrue(firstDelete.isDone());
+                assertFalse(firstDelete.isCompletedExceptionally());
+                assertFalse(persistentTopic.disablingCompaction.get());
+            });
+            firstDelete.join();
+        } finally {
+            blockedCompaction.complete(0L);
+            persistentTopic.currentCompaction = originalCompaction;
+        }
 
         // cleanup.
         producer.close();

Reply via email to