This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f1018117335 [fix][broker] Fix compaction horizon might be reset to an
old position when phase two is interrupted (#25119)
f1018117335 is described below
commit f10181173357b13b690abb288b83a3a458292e2d
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Jan 4 19:03:56 2026 +0800
[fix][broker] Fix compaction horizon might be reset to an old position when
phase two is interrupted (#25119)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 8 +-
.../compaction/AbstractTwoPhaseCompactor.java | 6 +-
.../apache/pulsar/compaction/CompactionTest.java | 140 ++++++++++++++++-----
.../pulsar/compaction/StrategicCompactionTest.java | 45 ++++---
4 files changed, 150 insertions(+), 49 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ab15a6d6a17..b3cb8dc4596 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1572,7 +1572,12 @@ public class ManagedCursorImpl implements ManagedCursor {
final AsyncCallbacks.ResetCursorCallback callback =
resetCursorCallback;
- final Position newMarkDeletePosition =
ledger.getPreviousPosition(newReadPosition);
+ final Position newMarkDeletePosition;
+ if (isCompactionCursor()) {
+ newMarkDeletePosition = markDeletePosition;
+ } else {
+ newMarkDeletePosition =
ledger.getPreviousPosition(newReadPosition);
+ }
Runnable alignAcknowledgeStatusAfterPersisted = () -> {
// Correct the variable "messagesConsumedCounter".
@@ -1662,7 +1667,6 @@ public class ManagedCursorImpl implements ManagedCursor {
persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
- lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition,
getProperties(), null, null);
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ?
getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index ddfe8825a88..7aba181cb44 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.time.Duration;
@@ -59,6 +60,8 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractTwoPhaseCompactor<T> extends Compactor {
+ @VisibleForTesting
+ static Runnable injectionAfterSeekInPhaseTwo = () -> {};
private static final Logger log =
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
protected static final int MAX_OUTSTANDING = 500;
protected final Duration phaseOneLoopReadTimeout;
@@ -188,6 +191,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
CompletableFuture<Long> promise = new CompletableFuture<>();
reader.seekAsync(from).thenCompose((v) -> {
+ injectionAfterSeekInPhaseTwo.run();
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
phaseTwoLoop(reader, to, latestForKey, ledger, outstanding,
loopPromise, MessageId.earliest);
@@ -436,4 +440,4 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
public long getPhaseOneLoopReadTimeoutInSeconds() {
return phaseOneLoopReadTimeout.getSeconds();
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 889eb2b8a35..077cf9d0b11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -109,7 +110,8 @@ import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -121,7 +123,13 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
protected BookKeeper bk;
private PublishingOrderCompactor compactor;
- @BeforeMethod
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setDispatcherMaxReadBatchSize(1);
+ }
+
+ @BeforeClass
@Override
public void setup() throws Exception {
super.internalSetup();
@@ -139,7 +147,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
compactor = new PublishingOrderCompactor(conf, pulsarClient, bk,
compactionScheduler);
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
@@ -149,6 +157,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
+ @BeforeMethod(alwaysRun = true)
+ public void beforeMethod() throws Exception {
+ admin.namespaces().removeRetention("my-tenant/my-ns");
+ AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+ }
+
protected long compact(String topic) throws ExecutionException,
InterruptedException {
return compactor.compact(topic).get();
}
@@ -165,7 +179,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompaction() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/compaction";
final int numMessages = 20;
final int maxKeys = 10;
@@ -229,7 +243,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactionWithReader() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/compaction-with-reader";
final int numMessages = 20;
final int maxKeys = 10;
@@ -290,7 +304,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testReadCompactedBeforeCompaction() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/read-compacted-before-compaction";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -330,7 +344,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testReadEntriesAfterCompaction() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/read-entries-after-compaction";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -361,7 +375,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testSeekEarliestAfterCompaction() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/seek-earliest-after-compaction";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -402,7 +416,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBrokerRestartAfterCompaction() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/test-restart-after-compaction";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -444,7 +458,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEmptyTopic() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/compact-empty-topic";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -467,7 +481,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFirstMessageRetained() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/first-message-retained";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -508,7 +522,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBatchMessageIdsDontChange() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/batch-message-ids-dont-change";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -571,7 +585,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBatchMessageWithNullValue() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/batch-message-with-null-value";
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.receiverQueueSize(1).readCompacted(true).subscribe().close();
@@ -625,7 +639,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testWholeBatchCompactedOut() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/whole-batch-compacted-out";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -670,7 +684,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
restartBroker();
FieldUtils.writeField(compactor, "topicCompactionRetainNullKey",
retainNullKey, true);
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/key-less-messages-pass-through-" + retainNullKey;
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -737,7 +751,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testEmptyPayloadDeletes() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/empty-payload-deletes";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -819,7 +833,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/empty-payload-deletes-when-compressed";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -900,7 +914,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactorReadsCompacted() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compactor-reads-compacted";
// capture opened ledgers
Set<Long> ledgersOpened = Sets.newConcurrentHashSet();
@@ -952,6 +966,12 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
assertFalse(ledgersOpened.contains(info.ledgers.get(1).ledgerId));
ledgersOpened.clear();
+ try (Producer<byte[]> producerNormal =
pulsarClient.newProducer().topic(topic).create()) {
+ producerNormal.newMessage()
+ .key("key2")
+ .value("my-message".getBytes())
+ .send();
+ }
// force broker to close resources for topic
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
@@ -1000,7 +1020,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactCompressedNoBatch() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-compressed-no-batch";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1039,7 +1059,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactCompressedBatching() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-compressed-batching";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1118,7 +1138,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEncryptedNoBatch() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-encrypted-no-batch";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1160,7 +1180,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEncryptedBatching() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-encrypted-batching";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1216,7 +1236,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-encrypted-and-compressed-no-batch";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1259,7 +1279,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCompactEncryptedAndCompressedBatching() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compact-encrypted-and-compressed-batching";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1317,7 +1337,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test
public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/empty-payload-deletes-when-encrypted";
// subscribe before sending anything, so that we get all messages
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1413,7 +1433,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactionWithLastDeletedKey(boolean batching) throws
Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic =
"persistent://my-tenant/my-ns/compaction-with-last-deleted-key-" + batching;
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -1439,7 +1459,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testEmptyCompactionLedger(boolean batching) throws Exception {
- String topic = "persistent://my-tenant/my-ns/my-topic1";
+ String topic = "persistent://my-tenant/my-ns/empty-compaction-ledger-"
+ batching;
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -2347,7 +2367,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(timeOut = 120 * 1000)
public void testConcurrentCompactionAndTopicDelete() throws Exception {
- final String topicName =
newUniqueName("persistent://my-tenant/my-ns/tp");
+ final String topicName =
newUniqueName("persistent://my-tenant/my-ns/concurrent-compaction-topic-delete");
admin.topics().createNonPartitionedTopic(topicName);
// Load up the topic.
Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
@@ -2460,4 +2480,68 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
assertEquals(results, expected);
}
+
+ @Test
+ public void testPhaseTwoInterruption() throws Exception {
+ // Set infinite retention to retain all original ledgers
+ admin.namespaces().setRetention("my-tenant/my-ns", new
RetentionPolicies(-1, -1));
+ final var topic =
"persistent://my-tenant/my-ns/phase-two-interruption";
+ @Cleanup final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ final BiConsumer<String, String> send = (key, value) -> {
+ final var msgId =
producer.newMessage().key(key).value(value).sendAsync().join();
+ log.info("Sent {} => {} to {}", key, value, msgId);
+ };
+
+ send.accept("key-0", "value");
+ for (int i = 0; i < 3; i++) {
+ send.accept("key-1", "value-" + i);
+ }
+
+ triggerAndWaitCompaction(topic); // update the compaction horizon
+
+ AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {
+ // Simulate the case when the topic is closed during compaction
phase two
+ CompletableFuture.runAsync(() -> {
+ final var persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).join()
+ .orElseThrow();
+ persistentTopic.close().join();
+ });
+ };
+ // Send a new message so that the compaction won't be skipped
+ send.accept("key-2", "value-0");
+ send.accept("key-2", "value-1");
+ admin.topics().triggerCompaction(topic);
+ Awaitility.await().untilAsserted(() ->
assertFalse(pulsar.getBrokerService().getTopics()
+ .containsKey(TopicName.get(topic).toString())));
+
+ AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+
+ // Messages of "key-2" are not compacted due to the injected failure,
but the previous messages are read from
+ // the compacted ledger rather than the original ledger.
+ verifyReadKeyValues(topic, true, List.of("key-0", "value", "key-1",
"value-2", "key-2", "value-0", "key-2",
+ "value-1"));
+ // The original ledger still exists so old values of "key-1" can be
read
+ verifyReadKeyValues(topic, false, List.of("key-0", "value", "key-1",
"value-0", "key-1", "value-1", "key-1",
+ "value-2", "key-2", "value-0", "key-2", "value-1"));
+ }
+
+ private void verifyReadKeyValues(String topic, boolean readCompacted,
List<String> expectedKeyValues)
+ throws Exception {
+ @Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)
+ .startMessageId(MessageId.earliest).create();
+ final var keyValues = new ArrayList<String>();
+ while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNext();
+ keyValues.add(msg.getKey());
+ keyValues.add(msg.getValue());
+ }
+ assertEquals(keyValues, expectedKeyValues,
+ readCompacted + " " + String.join(",", keyValues.toArray(new
String[0])));
+ }
+
+ private void triggerAndWaitCompaction(String topic) throws Exception {
+ admin.topics().triggerCompaction(topic);
+ Awaitility.await().untilAsserted(() -> assertEquals(
+ admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 4cdd195d493..2ca03c55b30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -20,18 +20,23 @@ package org.apache.pulsar.compaction;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl.MSG_COMPRESSION_TYPE;
import static org.testng.Assert.assertEquals;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -45,45 +50,49 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "flaky")
-public class StrategicCompactionTest extends CompactionTest {
+public class StrategicCompactionTest extends MockedPulsarServiceBaseTest {
+
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
private TopicCompactionStrategy strategy;
private StrategicTwoPhaseCompactor compactor;
- @BeforeMethod
+ @BeforeClass
@Override
public void setup() throws Exception {
- super.setup();
+ super.internalSetup();
+ compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null,
Optional.empty(), null).get();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
strategy = new
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
}
+ @AfterClass(alwaysRun = true)
@Override
- protected long compact(String topic) throws ExecutionException,
InterruptedException {
- return (long) compactor.compact(topic, strategy).get();
- }
-
- @Override
- protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
- throws ExecutionException, InterruptedException {
- return (long) compactor.compact(topic, strategy,
cryptoKeyReader).get();
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ bk.close();
+ if (compactionScheduler != null) {
+ compactionScheduler.shutdownNow();
+ }
}
- @Override
- protected PublishingOrderCompactor getCompactor() {
- return compactor;
+ private long compact(String topic) throws ExecutionException,
InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
}
-
@Test
public void testNumericOrderCompaction() throws Exception {
strategy = new NumericOrderCompactionStrategy();
- String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String topic =
"persistent://my-property/use/my-ns/numeric-order-compaction";
final int numMessages = 50;
final int maxKeys = 5;