This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d2728253c66 [improve][broker] PIP-429: Optimize Handling of Compacted
Last Entry by Skipping Payload Buffer Parsing (#24523)
d2728253c66 is described below
commit d2728253c666f7d4bd4f111356f3b97663603b6a
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Aug 4 13:46:03 2025 +0800
[improve][broker] PIP-429: Optimize Handling of Compacted Last Entry by
Skipping Payload Buffer Parsing (#24523)
---
.../broker/admin/impl/PersistentTopicsBase.java | 31 +++-----
.../apache/pulsar/broker/service/ServerCnx.java | 77 +++-----------------
.../pulsar/client/impl/RawBatchConverter.java | 12 ++--
.../pulsar/compaction/CompactedTopicImpl.java | 17 +++--
.../compaction/PulsarTopicCompactionService.java | 83 +++++++++++++++++-----
.../pulsar/compaction/TopicCompactionService.java | 36 ++++++----
.../apache/pulsar/compaction/CompactorTest.java | 7 +-
.../compaction/GetLastMessageIdCompactedTest.java | 41 +++++++++++
.../compaction/TopicCompactionServiceTest.java | 28 --------
pulsar-common/src/main/proto/PulsarApi.proto | 10 +++
10 files changed, 177 insertions(+), 165 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 459d82cede8..3b74e19d288 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2837,31 +2837,16 @@ public class PersistentTopicsBase extends AdminResource
{
"Get message ID by timestamp on a non-persistent topic
is not allowed");
}
final PersistentTopic persistentTopic = (PersistentTopic)
topic;
+ final var compactionService =
persistentTopic.getTopicCompactionService();
- return
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry
-> {
- if (lastEntry == null) {
- return findMessageIdByPublishTime(timestamp,
persistentTopic.getManagedLedger());
- }
- MessageMetadata metadata;
- Position position = lastEntry.getPosition();
- try {
- metadata =
Commands.parseMessageMetadata(lastEntry.getDataBuffer());
- } finally {
- lastEntry.release();
- }
- if (timestamp == metadata.getPublishTime()) {
- return CompletableFuture.completedFuture(new
MessageIdImpl(position.getLedgerId(),
- position.getEntryId(),
topicName.getPartitionIndex()));
- } else if (timestamp < metadata.getPublishTime()) {
+ return
compactionService.getLastMessagePosition().thenCompose(messagePosition -> {
+ if (timestamp == messagePosition.publishTime()) {
+ return CompletableFuture.completedFuture(new
MessageIdImpl(messagePosition.ledgerId(),
+ messagePosition.entryId(),
topicName.getPartitionIndex()));
+ } else if (timestamp < messagePosition.publishTime()) {
return
persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
- .thenApply(compactedEntry -> {
- try {
- return new
MessageIdImpl(compactedEntry.getLedgerId(),
- compactedEntry.getEntryId(),
topicName.getPartitionIndex());
- } finally {
- compactedEntry.release();
- }
- });
+ .thenApply(__ -> new
MessageIdImpl(__.getLedgerId(), __.getEntryId(),
+ topicName.getPartitionIndex()));
} else {
return findMessageIdByPublishTime(timestamp,
persistentTopic.getManagedLedger());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 71cbefdcb5c..e2995a0a2cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,7 +41,6 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
@@ -137,7 +136,6 @@ import
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
-import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -148,10 +146,7 @@ import
org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
@@ -2275,7 +2270,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.thenApply(lastPosition -> {
int partitionIndex =
TopicName.getPartitionIndex(topic.getName());
- Position markDeletePosition = null;
+ Position markDeletePosition = PositionFactory.EARLIEST;
if (consumer.getSubscription() instanceof
PersistentSubscription) {
markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
@@ -2336,8 +2331,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
// if readCompacted is false, we need to return
MessageId.earliest
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1,
partitionIndex, -1,
- markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
+ markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId()));
}
return;
}
@@ -2396,8 +2390,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex,
largestBatchIndex,
- markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
+ markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId()));
}
});
});
@@ -2405,38 +2398,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private void handleLastMessageIdFromCompactionService(PersistentTopic
persistentTopic, long requestId,
int partitionIndex,
Position markDeletePosition) {
-
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry
-> {
- if (entry != null) {
- try {
- // in this case, all the data has been compacted, so
return the last position
- // in the compacted ledger to the client
- ByteBuf payload = entry.getDataBuffer();
- MessageMetadata metadata =
Commands.parseMessageMetadata(payload);
- int largestBatchIndex;
- try {
- largestBatchIndex =
calculateTheLastBatchIndexInBatch(metadata, payload);
- } catch (IOException ioEx) {
- writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError,
- "Failed to deserialize batched message from
the last entry of the compacted Ledger: "
- + ioEx.getMessage()));
- return;
- }
-
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
- entry.getLedgerId(), entry.getEntryId(),
partitionIndex, largestBatchIndex,
- markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
- } finally {
- entry.release();
- }
- } else {
- // in this case, the ledgers been removed except the current
ledger
- // and current ledger without any data
- writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
- -1, -1, partitionIndex, -1,
- markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
- markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
- }
- }).exceptionally(ex -> {
+
persistentTopic.getTopicCompactionService().getLastMessagePosition().thenAccept(position
->
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
position.ledgerId(), position.entryId(),
+ partitionIndex, position.batchIndex(),
markDeletePosition.getLedgerId(),
+ markDeletePosition.getEntryId()))
+ ).exceptionally(ex -> {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to read last entry of the compacted Ledger "
@@ -2445,33 +2411,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
- private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata,
ByteBuf payload) throws IOException {
- int batchSize = metadata.getNumMessagesInBatch();
- if (batchSize <= 1){
- return -1;
- }
- if (metadata.hasCompression()) {
- var tmp = payload;
- CompressionType compressionType = metadata.getCompression();
- CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
- int uncompressedSize = metadata.getUncompressedSize();
- payload = codec.decode(payload, uncompressedSize);
- tmp.release();
- }
- SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
- int lastBatchIndexInBatch = -1;
- for (int i = 0; i < batchSize; i++){
- ByteBuf singleMessagePayload =
- Commands.deSerializeSingleMessageInBatch(payload,
singleMessageMetadata, i, batchSize);
- singleMessagePayload.release();
- if (singleMessageMetadata.isCompactedOut()){
- continue;
- }
- lastBatchIndexInBatch = i;
- }
- return lastBatchIndexInBatch;
- }
-
private CompletableFuture<Boolean>
isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
if (!service.isAuthorizationEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index d8c491dab29..64ddea3ec6a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -151,7 +151,7 @@ public class RawBatchConverter {
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
try {
int batchSize = metadata.getNumMessagesInBatch();
- int messagesRetained = 0;
+ final var retainedBatchIndexes = new ArrayList<Integer>();
SingleMessageMetadata emptyMetadata = new
SingleMessageMetadata().setCompactedOut(true);
SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
@@ -169,7 +169,7 @@ public class RawBatchConverter {
Unpooled.EMPTY_BUFFER, batchBuffer);
} else if (!singleMessageMetadata.hasPartitionKey()) {
if (retainNullKey) {
- messagesRetained++;
+ retainedBatchIndexes.add(i);
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
@@ -178,7 +178,7 @@ public class RawBatchConverter {
}
} else if
(filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
- messagesRetained++;
+ retainedBatchIndexes.add(i);
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
singleMessagePayload, batchBuffer);
} else {
@@ -189,10 +189,14 @@ public class RawBatchConverter {
singleMessagePayload.release();
}
- if (messagesRetained > 0) {
+ if (!retainedBatchIndexes.isEmpty()) {
int newUncompressedSize = batchBuffer.readableBytes();
ByteBuf compressedPayload = codec.encode(batchBuffer);
+ metadata.clearCompactedBatchIndexes();
+ for (int index : retainedBatchIndexes) {
+ metadata.addCompactedBatchIndexe(index);
+ }
metadata.setUncompressedSize(newUncompressedSize);
ByteBuf metadataAndPayload =
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index cc317fa99f6..db1aecf3887 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -345,18 +345,23 @@ public class CompactedTopicImpl implements CompactedTopic
{
var compactedTopicContextFuture =
this.getCompactedTopicContextFuture();
if (compactedTopicContextFuture == null) {
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.failedFuture(new IllegalStateException(
+ "CompactedTopicContext is not initialized"));
}
return compactedTopicContextFuture.thenCompose(compactedTopicContext
-> {
LedgerHandle lh = compactedTopicContext.getLedger();
CompletableFuture<Long> promise = new CompletableFuture<>();
findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(),
promise, null, lh);
- return promise.thenCompose(index -> {
- if (index == null) {
- return CompletableFuture.completedFuture(null);
+ return promise.thenCompose(index -> readEntries(lh, index,
index).thenApply(entries -> {
+ if (entries.size() != 1) {
+ for (final var entry : entries) {
+ entry.release();
+ }
+ throw new IllegalStateException("Read " + entries.size() +
" entries from the compacted ledger "
+ + lh + " entry " + index);
}
- return readEntries(lh, index, index).thenApply(entries ->
entries.get(0));
- });
+ return entries.get(0);
+ }));
});
}
private static void findFirstMatchIndexLoop(final Predicate<Entry>
predicate,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index e123c11fd79..ea724cb6480 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,18 +22,22 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
import static
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.jspecify.annotations.NonNull;
@@ -98,34 +102,81 @@ public class PulsarTopicCompactionService implements
TopicCompactionService {
return resultFuture;
}
- @Override
- public CompletableFuture<Entry> readLastCompactedEntry() {
- return compactedTopic.readLastEntryOfCompactedLedger();
- }
-
@Override
public CompletableFuture<Position> getLastCompactedPosition() {
return
CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null));
}
@Override
- public CompletableFuture<Entry> findEntryByPublishTime(long publishTime) {
+ public CompletableFuture<Position> findEntryByPublishTime(long
publishTime) {
final Predicate<Entry> predicate = entry -> {
return
Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >=
publishTime;
};
- return compactedTopic.findFirstMatchEntry(predicate);
+ return compactedTopic.findFirstMatchEntry(predicate).thenApply(entry
-> {
+ try {
+ return PositionFactory.create(entry.getLedgerId(),
entry.getEntryId());
+ } finally {
+ entry.release();
+ }
+ });
}
@Override
- public CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex) {
- final Predicate<Entry> predicate = entry -> {
- BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
- if (brokerEntryMetadata == null ||
!brokerEntryMetadata.hasIndex()) {
- return false;
+ public CompletableFuture<MessagePosition> getLastMessagePosition() {
+ return compactedTopic.readLastEntryOfCompactedLedger().thenApply(entry
-> {
+ if (entry == null) {
+ return MessagePosition.EARLIEST;
}
- return brokerEntryMetadata.getIndex() >= entryIndex;
- };
- return compactedTopic.findFirstMatchEntry(predicate);
+ try {
+ final var payload = entry.getDataBuffer();
+ final var metadata = Commands.parseMessageMetadata(payload);
+ final var batchSize = metadata.getNumMessagesInBatch();
+ final var publishTime = metadata.getPublishTime();
+ if (batchSize <= 1) {
+ return new MessagePosition(entry.getLedgerId(),
entry.getEntryId(), -1, publishTime);
+ }
+ final int compactedBatchIndexesCount =
metadata.getCompactedBatchIndexesCount();
+ if (compactedBatchIndexesCount > 0) {
+ final var batchIndex =
metadata.getCompactedBatchIndexeAt(compactedBatchIndexesCount - 1);
+ return new MessagePosition(entry.getLedgerId(),
entry.getEntryId(), batchIndex, publishTime);
+ }
+ // Encrypted messages won't be compacted
+ if (metadata.getEncryptionKeysCount() > 0) {
+ return new MessagePosition(entry.getLedgerId(),
entry.getEntryId(), batchSize - 1, publishTime);
+ }
+ final ByteBuf uncompressedPayload;
+ if (metadata.hasCompression()) {
+ final var codec =
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+ final var uncompressedSize =
metadata.getUncompressedSize();
+ uncompressedPayload = codec.decode(payload,
uncompressedSize);
+ } else {
+ uncompressedPayload = payload.retain();
+ }
+ try {
+ SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
+ int batchIndex = -1;
+ for (int i = 0; i < batchSize; i++){
+ final var singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload,
+ singleMessageMetadata, i, batchSize);
+ singleMessagePayload.release();
+ if (singleMessageMetadata.isCompactedOut()){
+ continue;
+ }
+ batchIndex = i;
+ }
+ if (batchIndex < 0) {
+ throw new IllegalStateException("No valid message in
entry " + entry.getPosition());
+ }
+ return new MessagePosition(entry.getLedgerId(),
entry.getEntryId(), batchIndex, publishTime);
+ } finally {
+ uncompressedPayload.release();
+ }
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ } finally {
+ entry.release();
+ }
+ });
}
public CompactedTopicImpl getCompactedTopic() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
index 11ce916f3ea..6153f4cfa89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -47,13 +47,6 @@ public interface TopicCompactionService extends
AutoCloseable {
*/
CompletableFuture<List<Entry>> readCompactedEntries(@NonNull Position
startPosition, int numberOfEntriesToRead);
- /**
- * Read the last compacted entry from the TopicCompactionService.
- *
- * @return a future that will be completed with the compacted last entry,
this entry can be null.
- */
- CompletableFuture<Entry> readLastCompactedEntry();
-
/**
* Get the last compacted position from the TopicCompactionService.
*
@@ -67,14 +60,27 @@ public interface TopicCompactionService extends
AutoCloseable {
* @param publishTime the publish time of entry.
* @return the first entry metadata that greater or equal to target
publishTime, this entry can be null.
*/
- CompletableFuture<Entry> findEntryByPublishTime(long publishTime);
+ CompletableFuture<Position> findEntryByPublishTime(long publishTime);
/**
- * Find the first entry that greater or equal to target entryIndex,
- * if an entry that broker entry metadata is missed, then it will be
skipped and find the next match entry.
- *
- * @param entryIndex the index of entry.
- * @return the first entry that greater or equal to target entryIndex, this
entry can be null.
- */
- CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex);
+ * Retrieve the position of the last message before compaction.
+ *
+ * @return A future that completes with the position of the last message
before compaction, or
+ * {@link MessagePosition#EARLIEST} if no such message exists.
+ */
+ CompletableFuture<MessagePosition> getLastMessagePosition();
+
+ /**
+ * Represents the position of a message.
+ * <p>
+ * The `ledgerId` and `entryId` together specify the exact entry to which
the message belongs. For batched messages,
+ * the `batchIndex` field indicates the index of the message within the
batch. If the message is not part of a
+ * batch, the `batchIndex` field is set to -1. The `publishTime` field
corresponds to the publishing time of the
+ * entry's metadata, providing a timestamp for when the entry was
published.
+ * </p>
+ */
+ record MessagePosition(long ledgerId, long entryId, int batchIndex, long
publishTime) {
+
+ public static final MessagePosition EARLIEST = new
MessagePosition(-1L, -1L, 0, 0);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index c7deab62460..0aeb49312be 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -45,7 +45,6 @@ import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -400,11 +399,11 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
});
Position lastCompactedPosition =
topicCompactionService.getLastCompactedPosition().get();
- Entry lastCompactedEntry =
topicCompactionService.readLastCompactedEntry().get();
+ final var lastMessagePosition =
topicCompactionService.getLastMessagePosition().get();
Assert.assertTrue(PositionFactory.create(lastCompactedPosition.getLedgerId(),
-
lastCompactedPosition.getEntryId()).compareTo(lastCompactedEntry.getLedgerId(),
- lastCompactedEntry.getEntryId()) >= 0);
+
lastCompactedPosition.getEntryId()).compareTo(lastMessagePosition.ledgerId(),
+ lastMessagePosition.entryId()) >= 0);
future.join();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 252ee939aac..c5ee7a0a776 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -29,6 +29,8 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -39,6 +41,7 @@ import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -57,6 +60,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker-impl")
public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
@@ -516,4 +520,41 @@ public class GetLastMessageIdCompactedTest extends
ProducerConsumerBase {
assertNotEquals(message, null);
}
}
+
+ @Test(timeOut = 30000)
+ public void testGetLastMessageIdForEncryptedMessage() throws Exception {
+ final var topic = BrokerTestUtil.newUniqueName("tp");
+ final var ecdsaPublickeyFile =
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+ final String ecdsaPrivateKeyFile =
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+ @Cleanup final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic)
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .batchingMaxMessages(Integer.MAX_VALUE)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .addEncryptionKey("client-ecdsa.pem")
+ .defaultCryptoKeyReader(ecdsaPublickeyFile)
+ .create();
+ producer.newMessage().key("k0").value("v0").sendAsync();
+ producer.newMessage().key("k0").value("v1").sendAsync();
+ producer.newMessage().key("k1").value("v0").sendAsync();
+ producer.newMessage().key("k1").value(null).sendAsync();
+ producer.flush();
+ triggerCompactionAndWait(topic);
+
+ @Cleanup final var consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub")
+
.readCompacted(true).defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
+ final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0);
+ // Compaction does not work for encrypted messages
+ assertEquals(msgId.getBatchIndex(), 3);
+
+ @Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).topic(topic)
+
.startMessageId(MessageId.earliest).topic(topic).readCompacted(true)
+ .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create();
+ MessageIdAdv readMsgId = (MessageIdAdv) MessageId.earliest;
+ while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNext();
+ log.info("Read key: {}, value: {}", msg.getKey(),
Optional.ofNullable(msg.getValue()).orElse("(null)"));
+ readMsgId = (MessageIdAdv) msg.getMessageId();
+ }
+ assertEquals(readMsgId, msgId);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index 8654e595c2a..da4ce33db40 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.compaction;
import static
org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.fail;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
@@ -40,11 +39,8 @@ import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.protocol.Commands;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -173,29 +169,5 @@ public class TopicCompactionServiceTest extends
MockedPulsarServiceBaseTest {
List<Entry> entries2 =
service.readCompactedEntries(PositionFactory.EARLIEST, 1).join();
assertEquals(entries2.size(), 1);
-
- Entry entry = service.findEntryByEntryIndex(0).join();
- BrokerEntryMetadata brokerEntryMetadata =
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
- assertNotNull(brokerEntryMetadata);
- assertEquals(brokerEntryMetadata.getIndex(), 2);
- MessageMetadata metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
- assertEquals(metadata.getPartitionKey(), "a");
- entry.release();
-
- entry = service.findEntryByEntryIndex(3).join();
- brokerEntryMetadata =
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
- assertNotNull(brokerEntryMetadata);
- assertEquals(brokerEntryMetadata.getIndex(), 4);
- metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
- assertEquals(metadata.getPartitionKey(), "b");
- entry.release();
-
- entry = service.findEntryByPublishTime(startTime).join();
- brokerEntryMetadata =
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
- assertNotNull(brokerEntryMetadata);
- assertEquals(brokerEntryMetadata.getIndex(), 2);
- metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
- assertEquals(metadata.getPartitionKey(), "a");
- entry.release();
}
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index eacec33169e..a03daf05f99 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -163,6 +163,16 @@ message MessageMetadata {
// Indicate if the message partition key is set
optional bool null_partition_key = 30 [default = false];
+
+ // Indicates the indexes of messages retained in the batch after
compaction. When a batch is compacted,
+ // some messages may be removed (compacted out). For example, if the
original batch contains:
+ // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will
retain only `k0 => v0` and `k2 => v2`.
+ // In this case, this field will be set to `[0, 2]`, and the payload
buffer will only include the retained messages.
+ //
+ // Note: Batches compacted by older versions of the compaction service do
not include this field. For such batches,
+ // the `compacted_out` field in `SingleMessageMetadata` must be checked to
identify and filter out compacted
+ // messages (e.g., `k1 => v1` and `k1 => null` in the example above).
+ repeated int32 compacted_batch_indexes = 31;
}
message SingleMessageMetadata {