This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 91544c9cad4 [fix][broker] Avoid splitting one batch message into two entries in StrategicTwoPhaseCompactor (#21156) 91544c9cad4 is described below commit 91544c9cad45b6435dd3a7edaf8fa95bf55a4f5a Author: Kai Wang <kw...@apache.org> AuthorDate: Fri Sep 15 10:26:17 2023 +0800 [fix][broker] Avoid splitting one batch message into two entries in StrategicTwoPhaseCompactor (#21156) --- .../client/impl/RawBatchMessageContainerImpl.java | 41 ++++++++-- .../compaction/StrategicTwoPhaseCompactor.java | 90 ++++++++++------------ .../impl/RawBatchMessageContainerImplTest.java | 53 +++++++------ .../apache/pulsar/compaction/CompactionTest.java | 11 +-- .../StrategicCompactionRetentionTest.java | 2 +- .../pulsar/compaction/StrategicCompactionTest.java | 66 +++++++++++++++- .../pulsar/compaction/StrategicCompactorTest.java | 4 +- 7 files changed, 172 insertions(+), 95 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 7e1c2cd5e3f..ba8d3db7178 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Set; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageCrypto; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -44,17 +45,17 @@ import org.apache.pulsar.common.protocol.Commands; * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] */ public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl { - MessageCrypto msgCrypto; - Set<String> encryptionKeys; - CryptoKeyReader cryptoKeyReader; + private MessageCrypto<MessageMetadata, MessageMetadata> msgCrypto; + private Set<String> encryptionKeys; + private CryptoKeyReader cryptoKeyReader; + private MessageIdAdv lastAddedMessageId; - public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int maxBytesInBatch) { + public RawBatchMessageContainerImpl() { super(); this.compressionType = CompressionType.NONE; this.compressor = new CompressionCodecNone(); - this.maxNumMessagesInBatch = maxNumMessagesInBatch; - this.maxBytesInBatch = maxBytesInBatch; } + private ByteBuf encrypt(ByteBuf compressedPayload) { if (msgCrypto == null) { return compressedPayload; @@ -90,6 +91,28 @@ public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl { this.cryptoKeyReader = cryptoKeyReader; } + @Override + public boolean add(MessageImpl<?> msg, SendCallback callback) { + this.lastAddedMessageId = (MessageIdAdv) msg.getMessageId(); + return super.add(msg, callback); + } + + @Override + protected boolean isBatchFull() { + return false; + } + + @Override + public boolean haveEnoughSpace(MessageImpl<?> msg) { + if (lastAddedMessageId == null) { + return true; + } + // Keep same batch compact to same batch. + MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); + return msgId.getLedgerId() == lastAddedMessageId.getLedgerId() + && msgId.getEntryId() == lastAddedMessageId.getEntryId(); + } + /** * Serializes the batched messages and return the ByteBuf. * It sets the CompressionType and Encryption Keys from the batched messages. @@ -168,4 +191,10 @@ public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl { clear(); return buf; } + + @Override + public void clear() { + this.lastAddedMessageId = null; + super.clear(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index a6b09427427..fefa2ee959c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.compaction; -import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.time.Duration; import java.util.Iterator; @@ -63,39 +62,19 @@ import org.slf4j.LoggerFactory; public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class); private static final int MAX_OUTSTANDING = 500; - private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000; - private static final int MAX_BYTES_IN_BATCH = 128 * 1024; private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 * 1000; private final Duration phaseOneLoopReadTimeout; private final RawBatchMessageContainerImpl batchMessageContainer; - @VisibleForTesting public StrategicTwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, BookKeeper bk, - ScheduledExecutorService scheduler, - int maxNumMessagesInBatch) { - this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch, MAX_BYTES_IN_BATCH); - } - - private StrategicTwoPhaseCompactor(ServiceConfiguration conf, - PulsarClient pulsar, - BookKeeper bk, - ScheduledExecutorService scheduler, - int maxNumMessagesInBatch, - int maxBytesInBatch) { + ScheduledExecutorService scheduler) { super(conf, pulsar, bk, scheduler); - batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch); + batchMessageContainer = new RawBatchMessageContainerImpl(); phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); } - public StrategicTwoPhaseCompactor(ServiceConfiguration conf, - PulsarClient pulsar, - BookKeeper bk, - ScheduledExecutorService scheduler) { - this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH, MAX_BYTES_IN_BATCH); - } - public CompletableFuture<Long> compact(String topic) { throw new UnsupportedOperationException(); } @@ -418,7 +397,6 @@ public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { .whenComplete((res, exception2) -> { if (exception2 != null) { promise.completeExceptionally(exception2); - return; } }); phaseTwoLoop(topic, reader, lh, outstanding, promise); @@ -443,35 +421,45 @@ public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { <T> CompletableFuture<Boolean> addToCompactedLedger( LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) { + if (m == null) { + return flushBatchMessage(lh, topic, outstanding); + } + if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) { + batchMessageContainer.add((MessageImpl<?>) m, null); + return CompletableFuture.completedFuture(false); + } + CompletableFuture<Boolean> f = flushBatchMessage(lh, topic, outstanding); + batchMessageContainer.add((MessageImpl<?>) m, null); + return f; + } + + private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh, String topic, + Semaphore outstanding) { + if (batchMessageContainer.getNumMessagesInBatch() <= 0) { + return CompletableFuture.completedFuture(false); + } CompletableFuture<Boolean> bkf = new CompletableFuture<>(); - if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) { - if (batchMessageContainer.getNumMessagesInBatch() > 0) { - try { - ByteBuf serialized = batchMessageContainer.toByteBuf(); - outstanding.acquire(); - mxBean.addCompactionWriteOp(topic, serialized.readableBytes()); - long start = System.nanoTime(); - lh.asyncAddEntry(serialized, - (rc, ledger, eid, ctx) -> { - outstanding.release(); - mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(true); - } - }, null); + try { + ByteBuf serialized = batchMessageContainer.toByteBuf(); + outstanding.acquire(); + mxBean.addCompactionWriteOp(topic, serialized.readableBytes()); + long start = System.nanoTime(); + lh.asyncAddEntry(serialized, + (rc, ledger, eid, ctx) -> { + outstanding.release(); + mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(true); + } + }, null); - } catch (Throwable t) { - log.error("Failed to add entry", t); - batchMessageContainer.discard((Exception) t); - return FutureUtil.failedFuture(t); - } - } else { - bkf.complete(false); - } - } else { - bkf.complete(false); + } catch (Throwable t) { + log.error("Failed to add entry", t); + batchMessageContainer.discard((Exception) t); + bkf.completeExceptionally(t); + return bkf; } return bkf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java index 9b8b1e5efb9..d79a31c07f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.compaction.CompactionTest; import org.testng.Assert; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class RawBatchMessageContainerImplTest { @@ -56,8 +55,6 @@ public class RawBatchMessageContainerImplTest { CryptoKeyReader cryptoKeyReader; Map<String, EncryptionContext.EncryptionKey> encryptKeys; - int maxBytesInBatch = 5 * 1024 * 1024; - public void setEncryptionAndCompression(boolean encrypt, boolean compress) { if (compress) { compressionType = ZSTD; @@ -107,22 +104,22 @@ public class RawBatchMessageContainerImplTest { public void setup() throws Exception { setEncryptionAndCompression(false, true); } - @DataProvider(name = "testBatchLimitByMessageCount") - public static Object[][] testBatchLimitByMessageCount() { - return new Object[][] {{true}, {false}}; - } - - @Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount") - public void testToByteBufWithBatchLimit(boolean testBatchLimitByMessageCount) throws IOException { - RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ? - new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) : - new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5); + @Test(timeOut = 20000) + public void testToByteBufWithBatchLimit()throws IOException { + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); String topic = "my-topic"; - var full1 = container.add(createMessage(topic, "hi-1", 0), null); - var full2 = container.add(createMessage(topic, "hi-2", 1), null); + MessageImpl message1 = createMessage(topic, "hi-1", 0); + boolean hasEnoughSpase1 = container.haveEnoughSpace(message1); + var full1 = container.add(message1, null); assertFalse(full1); - assertTrue(full2); + assertTrue(hasEnoughSpase1); + MessageImpl message2 = createMessage(topic, "hi-2", 1); + boolean hasEnoughSpase2 = container.haveEnoughSpace(message2); + assertFalse(hasEnoughSpase2); + var full2 = container.add(message2, null); + assertFalse(full2); + ByteBuf buf = container.toByteBuf(); @@ -167,7 +164,7 @@ public class RawBatchMessageContainerImplTest { public void testToByteBufWithCompressionAndEncryption() throws IOException { setEncryptionAndCompression(true, true); - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2, maxBytesInBatch); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); container.setCryptoKeyReader(cryptoKeyReader); String topic = "my-topic"; container.add(createMessage(topic, "hi-1", 0), null); @@ -217,7 +214,7 @@ public class RawBatchMessageContainerImplTest { @Test public void testToByteBufWithSingleMessage() throws IOException { - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2, maxBytesInBatch); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); String topic = "my-topic"; container.add(createMessage(topic, "hi-1", 0), null); ByteBuf buf = container.toByteBuf(); @@ -250,25 +247,31 @@ public class RawBatchMessageContainerImplTest { } @Test - public void testMaxNumMessagesInBatch() { - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch); + public void testAddDifferentBatchMessage() { + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); String topic = "my-topic"; boolean isFull = container.add(createMessage(topic, "hi", 0), null); - Assert.assertTrue(isFull); - Assert.assertTrue(container.isBatchFull()); + Assert.assertFalse(isFull); + Assert.assertFalse(container.isBatchFull()); + MessageImpl message = createMessage(topic, "hi-1", 0); + Assert.assertTrue(container.haveEnoughSpace(message)); + isFull = container.add(message, null); + Assert.assertFalse(isFull); + message = createMessage(topic, "hi-2", 1); + Assert.assertFalse(container.haveEnoughSpace(message)); } @Test(expectedExceptions = UnsupportedOperationException.class) public void testCreateOpSendMsg() { - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); container.createOpSendMsg(); } @Test public void testToByteBufWithEncryptionWithoutCryptoKeyReader() { setEncryptionAndCompression(true, false); - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); String topic = "my-topic"; container.add(createMessage(topic, "hi-1", 0), null); Assert.assertEquals(container.getNumMessagesInBatch(), 1); @@ -286,7 +289,7 @@ public class RawBatchMessageContainerImplTest { @Test public void testToByteBufWithEncryptionWithInvalidEncryptKeys() { setEncryptionAndCompression(true, false); - RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch); + RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(); container.setCryptoKeyReader(cryptoKeyReader); encryptKeys = new HashMap<>(); encryptKeys.put(null, null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index e603b3ccc4d..f125022c187 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -80,7 +80,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -540,14 +539,8 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(message2.getKey(), "key2"); Assert.assertEquals(new String(message2.getData()), "my-message-3"); if (getCompactor() instanceof StrategicTwoPhaseCompactor) { - MessageIdImpl id = (MessageIdImpl) messages.get(0).getMessageId(); - MessageIdImpl id1 = new MessageIdImpl( - id.getLedgerId(), id.getEntryId(), id.getPartitionIndex()); - Assert.assertEquals(message1.getMessageId(), id1); - id = (MessageIdImpl) messages.get(2).getMessageId(); - MessageIdImpl id2 = new MessageIdImpl( - id.getLedgerId(), id.getEntryId(), id.getPartitionIndex()); - Assert.assertEquals(message2.getMessageId(), id2); + Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId()); + Assert.assertEquals(message2.getMessageId(), messages.get(1).getMessageId()); } else { Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId()); Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java index 1cac04c2fa9..e556ec8e0b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java @@ -34,7 +34,7 @@ public class StrategicCompactionRetentionTest extends CompactionRetentionTest { @Override public void setup() throws Exception { super.setup(); - compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java index 135a839bd54..54563431052 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java @@ -18,22 +18,33 @@ */ package org.apache.pulsar.compaction; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE; +import static org.testng.Assert.assertEquals; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.topics.TopicCompactionStrategy; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -47,7 +58,7 @@ public class StrategicCompactionTest extends CompactionTest { @Override public void setup() throws Exception { super.setup(); - compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy(); } @@ -148,5 +159,58 @@ public class StrategicCompactionTest extends CompactionTest { Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet()); } + @Test(timeOut = 20000) + public void testSameBatchCompactToSameBatch() throws Exception { + final String topic = + "persistent://my-property/use/my-ns/testSameBatchCompactToSameBatch" + UUID.randomUUID(); + + // Use odd number to make sure the last message is flush by `reader.hasNext() == false`. + final int messages = 11; + + // 1.create producer and publish message to the topic. + ProducerBuilder<Integer> builder = pulsarClient.newProducer(Schema.INT32) + .compressionType(MSG_COMPRESSION_TYPE).topic(topic); + builder.batchingMaxMessages(2) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + + Producer<Integer> producer = builder.create(); + + List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(String.valueOf(i)) + .value(i) + .sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + // consumer with readCompacted enabled only get compacted entries + try (Consumer<Integer> consumer = pulsarClient + .newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub1") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { + int received = 0; + while (true) { + Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS); + if (m == null) { + break; + } + MessageIdAdv messageId = (MessageIdAdv) m.getMessageId(); + if (received < messages - 1) { + assertEquals(messageId.getBatchSize(), 2); + } else { + assertEquals(messageId.getBatchSize(), 0); + } + received++; + } + assertEquals(received, messages); + } + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java index 91dd8a2bd35..bc65791b323 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java @@ -33,7 +33,7 @@ public class StrategicCompactorTest extends CompactorTest { @Override public void setup() throws Exception { super.setup(); - compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy(); } @@ -46,4 +46,4 @@ public class StrategicCompactorTest extends CompactorTest { protected Compactor getCompactor() { return compactor; } -} \ No newline at end of file +}