This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cf1cb2e8513 [fix][test] Fix flaky compaction disable test by
controlling delete concurrency (#25794)
cf1cb2e8513 is described below
commit cf1cb2e8513573dfaca5327fffe23fad77301229
Author: void-ptr974 <[email protected]>
AuthorDate: Wed May 20 16:33:22 2026 +0800
[fix][test] Fix flaky compaction disable test by controlling delete
concurrency (#25794)
---
.../persistent/CompactionConcurrencyTest.java | 53 ++++++++++++++--------
1 file changed, 35 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
index b4f8a0807c0..d42f65d4c7b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/CompactionConcurrencyTest.java
@@ -19,13 +19,16 @@
package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@@ -56,27 +59,41 @@ public class CompactionConcurrencyTest extends
SharedPulsarBaseTest {
producer.newMessage().key("k0").value("v0").send();
triggerCompactionAndWait(topicName);
admin.topics().deleteSubscription(topicName, "s1");
+ PersistentTopic persistentTopic =
+ (PersistentTopic) getTopic(topicName, false).get().get();
// Disable compaction.
admin.topicPolicies().removeCompactionThreshold(topicName);
- AtomicReference<CompletableFuture<Void>> f1 = new
AtomicReference<CompletableFuture<Void>>();
- AtomicReference<CompletableFuture<Void>> f2 = new
AtomicReference<CompletableFuture<Void>>();
- new Thread(() -> {
- f1.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
- }).start();
- new Thread(() -> {
- f2.set(admin.topics().deleteSubscriptionAsync(topicName,
"__compaction"));
- }).start();
+ CompletableFuture<Long> originalCompaction =
persistentTopic.currentCompaction;
+ CompletableFuture<Long> blockedCompaction = new CompletableFuture<>();
+ persistentTopic.currentCompaction = blockedCompaction;
+ try {
+ CompletableFuture<Void> firstDelete =
+ admin.topics().deleteSubscriptionAsync(topicName,
Compactor.COMPACTION_SUBSCRIPTION);
+ Awaitility.await().untilAsserted(() ->
assertTrue(persistentTopic.disablingCompaction.get()));
- // Verify: at least one of the requests should fail (the other may
succeed or also fail
- // with "not found" if the in-memory metadata store processes them
sequentially).
- Awaitility.await().untilAsserted(() -> {
- assertTrue(f1.get() != null);
- assertTrue(f2.get() != null);
- assertTrue(f1.get().isDone());
- assertTrue(f2.get().isDone());
- assertTrue(f1.get().isCompletedExceptionally() ||
f2.get().isCompletedExceptionally());
- });
+ CompletableFuture<Void> secondDelete =
+ admin.topics().deleteSubscriptionAsync(topicName,
Compactor.COMPACTION_SUBSCRIPTION);
+ Awaitility.await().untilAsserted(() ->
assertTrue(secondDelete.isCompletedExceptionally()));
+ try {
+ secondDelete.join();
+ fail("The second concurrent compaction subscription delete
should fail");
+ } catch (Exception ex) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(actEx instanceof
PulsarAdminException.PreconditionFailedException);
+ }
+
+ blockedCompaction.complete(0L);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(firstDelete.isDone());
+ assertFalse(firstDelete.isCompletedExceptionally());
+ assertFalse(persistentTopic.disablingCompaction.get());
+ });
+ firstDelete.join();
+ } finally {
+ blockedCompaction.complete(0L);
+ persistentTopic.currentCompaction = originalCompaction;
+ }
// cleanup.
producer.close();