This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 91544c9cad4 [fix][broker] Avoid splitting one batch message into two 
entries in StrategicTwoPhaseCompactor (#21156)
91544c9cad4 is described below

commit 91544c9cad45b6435dd3a7edaf8fa95bf55a4f5a
Author: Kai Wang <kw...@apache.org>
AuthorDate: Fri Sep 15 10:26:17 2023 +0800

    [fix][broker] Avoid splitting one batch message into two entries in 
StrategicTwoPhaseCompactor (#21156)
---
 .../client/impl/RawBatchMessageContainerImpl.java  | 41 ++++++++--
 .../compaction/StrategicTwoPhaseCompactor.java     | 90 ++++++++++------------
 .../impl/RawBatchMessageContainerImplTest.java     | 53 +++++++------
 .../apache/pulsar/compaction/CompactionTest.java   | 11 +--
 .../StrategicCompactionRetentionTest.java          |  2 +-
 .../pulsar/compaction/StrategicCompactionTest.java | 66 +++++++++++++++-
 .../pulsar/compaction/StrategicCompactorTest.java  |  4 +-
 7 files changed, 172 insertions(+), 95 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index 7e1c2cd5e3f..ba8d3db7178 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Set;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -44,17 +45,17 @@ import org.apache.pulsar.common.protocol.Commands;
  * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, 
v3), (k3, v3)]
  */
 public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
-    MessageCrypto msgCrypto;
-    Set<String> encryptionKeys;
-    CryptoKeyReader cryptoKeyReader;
+    private MessageCrypto<MessageMetadata, MessageMetadata> msgCrypto;
+    private Set<String> encryptionKeys;
+    private CryptoKeyReader cryptoKeyReader;
+    private MessageIdAdv lastAddedMessageId;
 
-    public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int 
maxBytesInBatch) {
+    public RawBatchMessageContainerImpl() {
         super();
         this.compressionType = CompressionType.NONE;
         this.compressor = new CompressionCodecNone();
-        this.maxNumMessagesInBatch = maxNumMessagesInBatch;
-        this.maxBytesInBatch = maxBytesInBatch;
     }
+
     private ByteBuf encrypt(ByteBuf compressedPayload) {
         if (msgCrypto == null) {
             return compressedPayload;
@@ -90,6 +91,28 @@ public class RawBatchMessageContainerImpl extends 
BatchMessageContainerImpl {
         this.cryptoKeyReader = cryptoKeyReader;
     }
 
+    @Override
+    public boolean add(MessageImpl<?> msg, SendCallback callback) {
+        this.lastAddedMessageId = (MessageIdAdv) msg.getMessageId();
+        return super.add(msg, callback);
+    }
+
+    @Override
+    protected boolean isBatchFull() {
+        return false;
+    }
+
+    @Override
+    public boolean haveEnoughSpace(MessageImpl<?> msg) {
+        if (lastAddedMessageId == null) {
+            return true;
+        }
+        // Keep same batch compact to same batch.
+        MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
+        return msgId.getLedgerId() == lastAddedMessageId.getLedgerId()
+                && msgId.getEntryId() == lastAddedMessageId.getEntryId();
+    }
+
     /**
      * Serializes the batched messages and return the ByteBuf.
      * It sets the CompressionType and Encryption Keys from the batched 
messages.
@@ -168,4 +191,10 @@ public class RawBatchMessageContainerImpl extends 
BatchMessageContainerImpl {
         clear();
         return buf;
     }
+
+    @Override
+    public void clear() {
+        this.lastAddedMessageId = null;
+        super.clear();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index a6b09427427..fefa2ee959c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.compaction;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.time.Duration;
 import java.util.Iterator;
@@ -63,39 +62,19 @@ import org.slf4j.LoggerFactory;
 public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
     private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
-    private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
-    private static final int MAX_BYTES_IN_BATCH = 128 * 1024;
     private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 
* 1000;
     private final Duration phaseOneLoopReadTimeout;
     private final RawBatchMessageContainerImpl batchMessageContainer;
 
-    @VisibleForTesting
     public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
                                       PulsarClient pulsar,
                                       BookKeeper bk,
-                                      ScheduledExecutorService scheduler,
-                                      int maxNumMessagesInBatch) {
-        this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch, 
MAX_BYTES_IN_BATCH);
-    }
-
-    private StrategicTwoPhaseCompactor(ServiceConfiguration conf,
-                                      PulsarClient pulsar,
-                                      BookKeeper bk,
-                                      ScheduledExecutorService scheduler,
-                                      int maxNumMessagesInBatch,
-                                      int maxBytesInBatch) {
+                                      ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
-        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
+        batchMessageContainer = new RawBatchMessageContainerImpl();
         phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
     }
 
-    public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
-                                      PulsarClient pulsar,
-                                      BookKeeper bk,
-                                      ScheduledExecutorService scheduler) {
-        this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH, 
MAX_BYTES_IN_BATCH);
-    }
-
     public CompletableFuture<Long> compact(String topic) {
         throw new UnsupportedOperationException();
     }
@@ -418,7 +397,6 @@ public class StrategicTwoPhaseCompactor extends 
TwoPhaseCompactor {
                                 .whenComplete((res, exception2) -> {
                                     if (exception2 != null) {
                                         
promise.completeExceptionally(exception2);
-                                        return;
                                     }
                                 });
                         phaseTwoLoop(topic, reader, lh, outstanding, promise);
@@ -443,35 +421,45 @@ public class StrategicTwoPhaseCompactor extends 
TwoPhaseCompactor {
 
     <T> CompletableFuture<Boolean> addToCompactedLedger(
             LedgerHandle lh, Message<T> m, String topic, Semaphore 
outstanding) {
+        if (m == null) {
+            return flushBatchMessage(lh, topic, outstanding);
+        }
+        if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+            batchMessageContainer.add((MessageImpl<?>) m, null);
+            return CompletableFuture.completedFuture(false);
+        }
+        CompletableFuture<Boolean> f = flushBatchMessage(lh, topic, 
outstanding);
+        batchMessageContainer.add((MessageImpl<?>) m, null);
+        return f;
+    }
+
+    private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh, 
String topic,
+                                                         Semaphore 
outstanding) {
+        if (batchMessageContainer.getNumMessagesInBatch() <= 0) {
+            return CompletableFuture.completedFuture(false);
+        }
         CompletableFuture<Boolean> bkf = new CompletableFuture<>();
-        if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
-            if (batchMessageContainer.getNumMessagesInBatch() > 0) {
-                try {
-                    ByteBuf serialized = batchMessageContainer.toByteBuf();
-                    outstanding.acquire();
-                    mxBean.addCompactionWriteOp(topic, 
serialized.readableBytes());
-                    long start = System.nanoTime();
-                    lh.asyncAddEntry(serialized,
-                            (rc, ledger, eid, ctx) -> {
-                                outstanding.release();
-                                mxBean.addCompactionLatencyOp(topic, 
System.nanoTime() - start, TimeUnit.NANOSECONDS);
-                                if (rc != BKException.Code.OK) {
-                                    
bkf.completeExceptionally(BKException.create(rc));
-                                } else {
-                                    bkf.complete(true);
-                                }
-                            }, null);
+        try {
+            ByteBuf serialized = batchMessageContainer.toByteBuf();
+            outstanding.acquire();
+            mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+            long start = System.nanoTime();
+            lh.asyncAddEntry(serialized,
+                    (rc, ledger, eid, ctx) -> {
+                        outstanding.release();
+                        mxBean.addCompactionLatencyOp(topic, System.nanoTime() 
- start, TimeUnit.NANOSECONDS);
+                        if (rc != BKException.Code.OK) {
+                            bkf.completeExceptionally(BKException.create(rc));
+                        } else {
+                            bkf.complete(true);
+                        }
+                    }, null);
 
-                } catch (Throwable t) {
-                    log.error("Failed to add entry", t);
-                    batchMessageContainer.discard((Exception) t);
-                    return FutureUtil.failedFuture(t);
-                }
-            } else {
-                bkf.complete(false);
-            }
-        } else {
-            bkf.complete(false);
+        } catch (Throwable t) {
+            log.error("Failed to add entry", t);
+            batchMessageContainer.discard((Exception) t);
+            bkf.completeExceptionally(t);
+            return bkf;
         }
         return bkf;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
index 9b8b1e5efb9..d79a31c07f2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.compaction.CompactionTest;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class RawBatchMessageContainerImplTest {
@@ -56,8 +55,6 @@ public class RawBatchMessageContainerImplTest {
     CryptoKeyReader cryptoKeyReader;
     Map<String, EncryptionContext.EncryptionKey> encryptKeys;
 
-    int maxBytesInBatch = 5 * 1024 * 1024;
-
     public void setEncryptionAndCompression(boolean encrypt, boolean compress) 
{
         if (compress) {
             compressionType = ZSTD;
@@ -107,22 +104,22 @@ public class RawBatchMessageContainerImplTest {
     public void setup() throws Exception {
         setEncryptionAndCompression(false, true);
     }
-    @DataProvider(name = "testBatchLimitByMessageCount")
-    public static Object[][] testBatchLimitByMessageCount() {
-        return new Object[][] {{true}, {false}};
-    }
-
-    @Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount")
-    public void testToByteBufWithBatchLimit(boolean 
testBatchLimitByMessageCount) throws IOException {
-        RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ?
-                new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) :
-                new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5);
+    @Test(timeOut = 20000)
+    public void testToByteBufWithBatchLimit()throws IOException {
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
 
         String topic = "my-topic";
-        var full1 = container.add(createMessage(topic, "hi-1", 0), null);
-        var full2 = container.add(createMessage(topic, "hi-2", 1), null);
+        MessageImpl message1 = createMessage(topic, "hi-1", 0);
+        boolean hasEnoughSpase1 = container.haveEnoughSpace(message1);
+        var full1 = container.add(message1, null);
         assertFalse(full1);
-        assertTrue(full2);
+        assertTrue(hasEnoughSpase1);
+        MessageImpl message2 = createMessage(topic, "hi-2", 1);
+        boolean hasEnoughSpase2 = container.haveEnoughSpace(message2);
+        assertFalse(hasEnoughSpase2);
+        var full2 = container.add(message2, null);
+        assertFalse(full2);
+
         ByteBuf buf = container.toByteBuf();
 
 
@@ -167,7 +164,7 @@ public class RawBatchMessageContainerImplTest {
     public void testToByteBufWithCompressionAndEncryption() throws IOException 
{
         setEncryptionAndCompression(true, true);
 
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2, maxBytesInBatch);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         container.setCryptoKeyReader(cryptoKeyReader);
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
@@ -217,7 +214,7 @@ public class RawBatchMessageContainerImplTest {
 
     @Test
     public void testToByteBufWithSingleMessage() throws IOException {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2, maxBytesInBatch);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
         ByteBuf buf = container.toByteBuf();
@@ -250,25 +247,31 @@ public class RawBatchMessageContainerImplTest {
     }
 
     @Test
-    public void testMaxNumMessagesInBatch() {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+    public void testAddDifferentBatchMessage() {
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         String topic = "my-topic";
 
         boolean isFull = container.add(createMessage(topic, "hi", 0), null);
-        Assert.assertTrue(isFull);
-        Assert.assertTrue(container.isBatchFull());
+        Assert.assertFalse(isFull);
+        Assert.assertFalse(container.isBatchFull());
+        MessageImpl message = createMessage(topic, "hi-1", 0);
+        Assert.assertTrue(container.haveEnoughSpace(message));
+        isFull = container.add(message, null);
+        Assert.assertFalse(isFull);
+        message = createMessage(topic, "hi-2", 1);
+        Assert.assertFalse(container.haveEnoughSpace(message));
     }
 
     @Test(expectedExceptions = UnsupportedOperationException.class)
     public void testCreateOpSendMsg() {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         container.createOpSendMsg();
     }
 
     @Test
     public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
         setEncryptionAndCompression(true, false);
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
         Assert.assertEquals(container.getNumMessagesInBatch(), 1);
@@ -286,7 +289,7 @@ public class RawBatchMessageContainerImplTest {
     @Test
     public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
         setEncryptionAndCompression(true, false);
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl();
         container.setCryptoKeyReader(cryptoKeyReader);
         encryptKeys = new HashMap<>();
         encryptKeys.put(null, null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index e603b3ccc4d..f125022c187 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -80,7 +80,6 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -540,14 +539,8 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(message2.getKey(), "key2");
             Assert.assertEquals(new String(message2.getData()), 
"my-message-3");
             if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
-                MessageIdImpl id = (MessageIdImpl) 
messages.get(0).getMessageId();
-                MessageIdImpl id1 = new MessageIdImpl(
-                        id.getLedgerId(), id.getEntryId(), 
id.getPartitionIndex());
-                Assert.assertEquals(message1.getMessageId(), id1);
-                id = (MessageIdImpl) messages.get(2).getMessageId();
-                MessageIdImpl id2 = new MessageIdImpl(
-                        id.getLedgerId(), id.getEntryId(), 
id.getPartitionIndex());
-                Assert.assertEquals(message2.getMessageId(), id2);
+                Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
+                Assert.assertEquals(message2.getMessageId(), 
messages.get(1).getMessageId());
             } else {
                 Assert.assertEquals(message1.getMessageId(), 
messages.get(0).getMessageId());
                 Assert.assertEquals(message2.getMessageId(), 
messages.get(2).getMessageId());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
index 1cac04c2fa9..e556ec8e0b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
@@ -34,7 +34,7 @@ public class StrategicCompactionRetentionTest extends 
CompactionRetentionTest {
     @Override
     public void setup() throws Exception {
         super.setup();
-        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
         strategy = new 
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 135a839bd54..54563431052 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -18,22 +18,33 @@
  */
 package org.apache.pulsar.compaction;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
+import static org.testng.Assert.assertEquals;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -47,7 +58,7 @@ public class StrategicCompactionTest extends CompactionTest {
     @Override
     public void setup() throws Exception {
         super.setup();
-        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
         strategy = new 
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
     }
 
@@ -148,5 +159,58 @@ public class StrategicCompactionTest extends 
CompactionTest {
         Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
     }
 
+    @Test(timeOut = 20000)
+    public void testSameBatchCompactToSameBatch() throws Exception {
+        final String topic =
+                
"persistent://my-property/use/my-ns/testSameBatchCompactToSameBatch" + 
UUID.randomUUID();
+
+        // Use odd number to make sure the last message is flush by 
`reader.hasNext() == false`.
+        final int messages = 11;
+
+        // 1.create producer and publish message to the topic.
+        ProducerBuilder<Integer> builder = 
pulsarClient.newProducer(Schema.INT32)
+                .compressionType(MSG_COMPRESSION_TYPE).topic(topic);
+        builder.batchingMaxMessages(2)
+                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+
+        Producer<Integer> producer = builder.create();
+
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            futures.add(producer.newMessage().key(String.valueOf(i))
+                    .value(i)
+                    .sendAsync());
+        }
+        FutureUtil.waitForAll(futures).get();
+
+        // 2.compact the topic.
+        StrategicTwoPhaseCompactor compactor
+                = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic, strategy).get();
 
+        // consumer with readCompacted enabled only get compacted entries
+        try (Consumer<Integer> consumer = pulsarClient
+                .newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .readCompacted(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) 
{
+            int received = 0;
+            while (true) {
+                Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+                if (m == null) {
+                    break;
+                }
+                MessageIdAdv messageId = (MessageIdAdv) m.getMessageId();
+                if (received < messages - 1) {
+                    assertEquals(messageId.getBatchSize(), 2);
+                } else {
+                    assertEquals(messageId.getBatchSize(), 0);
+                }
+                received++;
+            }
+            assertEquals(received, messages);
+        }
+
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
index 91dd8a2bd35..bc65791b323 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
@@ -33,7 +33,7 @@ public class StrategicCompactorTest extends CompactorTest {
     @Override
     public void setup() throws Exception {
         super.setup();
-        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
         strategy = new 
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
     }
 
@@ -46,4 +46,4 @@ public class StrategicCompactorTest extends CompactorTest {
     protected Compactor getCompactor() {
         return compactor;
     }
-}
\ No newline at end of file
+}

Reply via email to