This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d2728253c66 [improve][broker] PIP-429: Optimize Handling of Compacted 
Last Entry by Skipping Payload Buffer Parsing  (#24523)
d2728253c66 is described below

commit d2728253c666f7d4bd4f111356f3b97663603b6a
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Aug 4 13:46:03 2025 +0800

    [improve][broker] PIP-429: Optimize Handling of Compacted Last Entry by 
Skipping Payload Buffer Parsing  (#24523)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 31 +++-----
 .../apache/pulsar/broker/service/ServerCnx.java    | 77 +++-----------------
 .../pulsar/client/impl/RawBatchConverter.java      | 12 ++--
 .../pulsar/compaction/CompactedTopicImpl.java      | 17 +++--
 .../compaction/PulsarTopicCompactionService.java   | 83 +++++++++++++++++-----
 .../pulsar/compaction/TopicCompactionService.java  | 36 ++++++----
 .../apache/pulsar/compaction/CompactorTest.java    |  7 +-
 .../compaction/GetLastMessageIdCompactedTest.java  | 41 +++++++++++
 .../compaction/TopicCompactionServiceTest.java     | 28 --------
 pulsar-common/src/main/proto/PulsarApi.proto       | 10 +++
 10 files changed, 177 insertions(+), 165 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 459d82cede8..3b74e19d288 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2837,31 +2837,16 @@ public class PersistentTopicsBase extends AdminResource 
{
                         "Get message ID by timestamp on a non-persistent topic 
is not allowed");
                 }
                 final PersistentTopic persistentTopic = (PersistentTopic) 
topic;
+                final var compactionService = 
persistentTopic.getTopicCompactionService();
 
-                return 
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry
 -> {
-                    if (lastEntry == null) {
-                        return findMessageIdByPublishTime(timestamp, 
persistentTopic.getManagedLedger());
-                    }
-                    MessageMetadata metadata;
-                    Position position = lastEntry.getPosition();
-                    try {
-                        metadata = 
Commands.parseMessageMetadata(lastEntry.getDataBuffer());
-                    } finally {
-                        lastEntry.release();
-                    }
-                    if (timestamp == metadata.getPublishTime()) {
-                        return CompletableFuture.completedFuture(new 
MessageIdImpl(position.getLedgerId(),
-                                position.getEntryId(), 
topicName.getPartitionIndex()));
-                    } else if (timestamp < metadata.getPublishTime()) {
+                return 
compactionService.getLastMessagePosition().thenCompose(messagePosition -> {
+                    if (timestamp == messagePosition.publishTime()) {
+                        return CompletableFuture.completedFuture(new 
MessageIdImpl(messagePosition.ledgerId(),
+                                messagePosition.entryId(), 
topicName.getPartitionIndex()));
+                    } else if (timestamp < messagePosition.publishTime()) {
                         return 
persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
-                                .thenApply(compactedEntry -> {
-                                    try {
-                                        return new 
MessageIdImpl(compactedEntry.getLedgerId(),
-                                                compactedEntry.getEntryId(), 
topicName.getPartitionIndex());
-                                    } finally {
-                                        compactedEntry.release();
-                                    }
-                                });
+                                .thenApply(__ -> new 
MessageIdImpl(__.getLedgerId(), __.getEntryId(),
+                                        topicName.getPartitionIndex()));
                     } else {
                         return findMessageIdByPublishTime(timestamp, 
persistentTopic.getManagedLedger());
                     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 71cbefdcb5c..e2995a0a2cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,7 +41,6 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.Promise;
 import io.netty.util.concurrent.ScheduledFuture;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -137,7 +136,6 @@ import 
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
-import org.apache.pulsar.common.api.proto.CompressionType;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -148,10 +146,7 @@ import 
org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.Metadata;
@@ -2275,7 +2270,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                  .thenApply(lastPosition -> {
                      int partitionIndex = 
TopicName.getPartitionIndex(topic.getName());
 
-                     Position markDeletePosition = null;
+                     Position markDeletePosition = PositionFactory.EARLIEST;
                      if (consumer.getSubscription() instanceof 
PersistentSubscription) {
                          markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
                                  .getMarkDeletedPosition();
@@ -2336,8 +2331,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 } else {
                     // if readCompacted is false, we need to return 
MessageId.earliest
                     
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, 
partitionIndex, -1,
-                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                            markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId()));
                 }
                 return;
             }
@@ -2396,8 +2390,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                     
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
lastPosition.getLedgerId(),
                             lastPosition.getEntryId(), partitionIndex, 
largestBatchIndex,
-                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                            markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId()));
                 }
             });
         });
@@ -2405,38 +2398,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     private void handleLastMessageIdFromCompactionService(PersistentTopic 
persistentTopic, long requestId,
                                                           int partitionIndex, 
Position markDeletePosition) {
-        
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry
 -> {
-            if (entry != null) {
-                try {
-                    // in this case, all the data has been compacted, so 
return the last position
-                    // in the compacted ledger to the client
-                    ByteBuf payload = entry.getDataBuffer();
-                    MessageMetadata metadata = 
Commands.parseMessageMetadata(payload);
-                    int largestBatchIndex;
-                    try {
-                        largestBatchIndex = 
calculateTheLastBatchIndexInBatch(metadata, payload);
-                    } catch (IOException ioEx) {
-                        writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError,
-                                "Failed to deserialize batched message from 
the last entry of the compacted Ledger: "
-                                        + ioEx.getMessage()));
-                        return;
-                    }
-                    
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                            entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
-                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-                } finally {
-                    entry.release();
-                }
-            } else {
-                // in this case, the ledgers been removed except the current 
ledger
-                // and current ledger without any data
-                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-                        -1, -1, partitionIndex, -1,
-                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
-                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
-            }
-        }).exceptionally(ex -> {
+        
persistentTopic.getTopicCompactionService().getLastMessagePosition().thenAccept(position
 ->
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
position.ledgerId(), position.entryId(),
+                        partitionIndex, position.batchIndex(), 
markDeletePosition.getLedgerId(),
+                        markDeletePosition.getEntryId()))
+        ).exceptionally(ex -> {
             writeAndFlush(Commands.newError(
                     requestId, ServerError.MetadataError,
                     "Failed to read last entry of the compacted Ledger "
@@ -2445,33 +2411,6 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
     }
 
-    private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, 
ByteBuf payload) throws IOException {
-        int batchSize = metadata.getNumMessagesInBatch();
-        if (batchSize <= 1){
-            return -1;
-        }
-        if (metadata.hasCompression()) {
-            var tmp = payload;
-            CompressionType compressionType = metadata.getCompression();
-            CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
-            int uncompressedSize = metadata.getUncompressedSize();
-            payload = codec.decode(payload, uncompressedSize);
-            tmp.release();
-        }
-        SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
-        int lastBatchIndexInBatch = -1;
-        for (int i = 0; i < batchSize; i++){
-            ByteBuf singleMessagePayload =
-                    Commands.deSerializeSingleMessageInBatch(payload, 
singleMessageMetadata, i, batchSize);
-            singleMessagePayload.release();
-            if (singleMessageMetadata.isCompactedOut()){
-                continue;
-            }
-            lastBatchIndexInBatch = i;
-        }
-        return lastBatchIndexInBatch;
-    }
-
     private CompletableFuture<Boolean> 
isNamespaceOperationAllowed(NamespaceName namespaceName,
                                                                    
NamespaceOperation operation) {
         if (!service.isAuthorizationEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index d8c491dab29..64ddea3ec6a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -151,7 +151,7 @@ public class RawBatchConverter {
         ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
         try {
             int batchSize = metadata.getNumMessagesInBatch();
-            int messagesRetained = 0;
+            final var retainedBatchIndexes = new ArrayList<Integer>();
 
             SingleMessageMetadata emptyMetadata = new 
SingleMessageMetadata().setCompactedOut(true);
             SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
@@ -169,7 +169,7 @@ public class RawBatchConverter {
                             Unpooled.EMPTY_BUFFER, batchBuffer);
                 } else if (!singleMessageMetadata.hasPartitionKey()) {
                     if (retainNullKey) {
-                        messagesRetained++;
+                        retainedBatchIndexes.add(i);
                         
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
                                 singleMessagePayload, batchBuffer);
                     } else {
@@ -178,7 +178,7 @@ public class RawBatchConverter {
                     }
                 } else if 
(filter.test(singleMessageMetadata.getPartitionKey(), id)
                            && singleMessagePayload.readableBytes() > 0) {
-                    messagesRetained++;
+                    retainedBatchIndexes.add(i);
                     
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
                                                                       
singleMessagePayload, batchBuffer);
                 } else {
@@ -189,10 +189,14 @@ public class RawBatchConverter {
                 singleMessagePayload.release();
             }
 
-            if (messagesRetained > 0) {
+            if (!retainedBatchIndexes.isEmpty()) {
                 int newUncompressedSize = batchBuffer.readableBytes();
                 ByteBuf compressedPayload = codec.encode(batchBuffer);
 
+                metadata.clearCompactedBatchIndexes();
+                for (int index : retainedBatchIndexes) {
+                    metadata.addCompactedBatchIndexe(index);
+                }
                 metadata.setUncompressedSize(newUncompressedSize);
 
                 ByteBuf metadataAndPayload = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index cc317fa99f6..db1aecf3887 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -345,18 +345,23 @@ public class CompactedTopicImpl implements CompactedTopic 
{
         var compactedTopicContextFuture = 
this.getCompactedTopicContextFuture();
 
         if (compactedTopicContextFuture == null) {
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.failedFuture(new IllegalStateException(
+                    "CompactedTopicContext is not initialized"));
         }
         return compactedTopicContextFuture.thenCompose(compactedTopicContext 
-> {
             LedgerHandle lh = compactedTopicContext.getLedger();
             CompletableFuture<Long> promise = new CompletableFuture<>();
             findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), 
promise, null, lh);
-            return promise.thenCompose(index -> {
-                if (index == null) {
-                    return CompletableFuture.completedFuture(null);
+            return promise.thenCompose(index -> readEntries(lh, index, 
index).thenApply(entries -> {
+                if (entries.size() != 1) {
+                    for (final var entry : entries) {
+                        entry.release();
+                    }
+                    throw new IllegalStateException("Read " + entries.size() + 
" entries from the compacted ledger "
+                            + lh + " entry " + index);
                 }
-                return readEntries(lh, index, index).thenApply(entries -> 
entries.get(0));
-            });
+                return entries.get(0);
+            }));
         });
     }
     private static void findFirstMatchIndexLoop(final Predicate<Entry> 
predicate,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index e123c11fd79..ea724cb6480 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,18 +22,22 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
 import static 
org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.jspecify.annotations.NonNull;
@@ -98,34 +102,81 @@ public class PulsarTopicCompactionService implements 
TopicCompactionService {
         return resultFuture;
     }
 
-    @Override
-    public CompletableFuture<Entry> readLastCompactedEntry() {
-        return compactedTopic.readLastEntryOfCompactedLedger();
-    }
-
     @Override
     public CompletableFuture<Position> getLastCompactedPosition() {
         return 
CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null));
     }
 
     @Override
-    public CompletableFuture<Entry> findEntryByPublishTime(long publishTime) {
+    public CompletableFuture<Position> findEntryByPublishTime(long 
publishTime) {
         final Predicate<Entry> predicate = entry -> {
             return 
Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= 
publishTime;
         };
-        return compactedTopic.findFirstMatchEntry(predicate);
+        return compactedTopic.findFirstMatchEntry(predicate).thenApply(entry 
-> {
+            try {
+                return PositionFactory.create(entry.getLedgerId(), 
entry.getEntryId());
+            } finally {
+                entry.release();
+            }
+        });
     }
 
     @Override
-    public CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex) {
-        final Predicate<Entry> predicate = entry -> {
-            BrokerEntryMetadata brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
-            if (brokerEntryMetadata == null || 
!brokerEntryMetadata.hasIndex()) {
-                return false;
+    public CompletableFuture<MessagePosition> getLastMessagePosition() {
+        return compactedTopic.readLastEntryOfCompactedLedger().thenApply(entry 
-> {
+            if (entry == null) {
+                return MessagePosition.EARLIEST;
             }
-            return brokerEntryMetadata.getIndex() >= entryIndex;
-        };
-        return compactedTopic.findFirstMatchEntry(predicate);
+            try {
+                final var payload = entry.getDataBuffer();
+                final var metadata = Commands.parseMessageMetadata(payload);
+                final var batchSize = metadata.getNumMessagesInBatch();
+                final var publishTime = metadata.getPublishTime();
+                if (batchSize <= 1) {
+                    return new MessagePosition(entry.getLedgerId(), 
entry.getEntryId(), -1, publishTime);
+                }
+                final int compactedBatchIndexesCount = 
metadata.getCompactedBatchIndexesCount();
+                if (compactedBatchIndexesCount > 0) {
+                    final var batchIndex = 
metadata.getCompactedBatchIndexeAt(compactedBatchIndexesCount - 1);
+                    return new MessagePosition(entry.getLedgerId(), 
entry.getEntryId(), batchIndex, publishTime);
+                }
+                // Encrypted messages won't be compacted
+                if (metadata.getEncryptionKeysCount() > 0) {
+                    return new MessagePosition(entry.getLedgerId(), 
entry.getEntryId(), batchSize - 1, publishTime);
+                }
+                final ByteBuf uncompressedPayload;
+                if (metadata.hasCompression()) {
+                    final var codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+                    final var uncompressedSize = 
metadata.getUncompressedSize();
+                    uncompressedPayload = codec.decode(payload, 
uncompressedSize);
+                } else {
+                    uncompressedPayload = payload.retain();
+                }
+                try {
+                    SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
+                    int batchIndex = -1;
+                    for (int i = 0; i < batchSize; i++){
+                        final var singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(payload,
+                                singleMessageMetadata, i, batchSize);
+                        singleMessagePayload.release();
+                        if (singleMessageMetadata.isCompactedOut()){
+                            continue;
+                        }
+                        batchIndex = i;
+                    }
+                    if (batchIndex < 0) {
+                        throw new IllegalStateException("No valid message in 
entry " + entry.getPosition());
+                    }
+                    return new MessagePosition(entry.getLedgerId(), 
entry.getEntryId(), batchIndex, publishTime);
+                } finally {
+                    uncompressedPayload.release();
+                }
+            } catch (IOException e) {
+                throw new CompletionException(e);
+            } finally {
+                entry.release();
+            }
+        });
     }
 
     public CompactedTopicImpl getCompactedTopic() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
index 11ce916f3ea..6153f4cfa89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java
@@ -47,13 +47,6 @@ public interface TopicCompactionService extends 
AutoCloseable {
      */
     CompletableFuture<List<Entry>> readCompactedEntries(@NonNull Position 
startPosition, int numberOfEntriesToRead);
 
-    /**
-     * Read the last compacted entry from the TopicCompactionService.
-     *
-     * @return a future that will be completed with the compacted last entry, 
this entry can be null.
-     */
-    CompletableFuture<Entry> readLastCompactedEntry();
-
     /**
      * Get the last compacted position from the TopicCompactionService.
      *
@@ -67,14 +60,27 @@ public interface TopicCompactionService extends 
AutoCloseable {
     * @param publishTime  the publish time of entry.
     * @return the first entry metadata that greater or equal to target 
publishTime, this entry can be null.
     */
-    CompletableFuture<Entry> findEntryByPublishTime(long publishTime);
+    CompletableFuture<Position> findEntryByPublishTime(long publishTime);
 
     /**
-    * Find the first entry that greater or equal to target entryIndex,
-    * if an entry that broker entry metadata is missed, then it will be 
skipped and find the next match entry.
-    *
-    * @param entryIndex  the index of entry.
-    * @return the first entry that greater or equal to target entryIndex, this 
entry can be null.
-    */
-    CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex);
+     * Retrieve the position of the last message before compaction.
+     *
+     * @return A future that completes with the position of the last message 
before compaction, or
+     *         {@link MessagePosition#EARLIEST} if no such message exists.
+     */
+    CompletableFuture<MessagePosition> getLastMessagePosition();
+
+    /**
+     * Represents the position of a message.
+     * <p>
+     * The `ledgerId` and `entryId` together specify the exact entry to which 
the message belongs. For batched messages,
+     * the `batchIndex` field indicates the index of the message within the 
batch. If the message is not part of a
+     * batch, the `batchIndex` field is set to -1. The `publishTime` field 
corresponds to the publishing time of the
+     * entry's metadata, providing a timestamp for when the entry was 
published.
+     * </p>
+     */
+    record MessagePosition(long ledgerId, long entryId, int batchIndex, long 
publishTime) {
+
+        public static final MessagePosition EARLIEST = new 
MessagePosition(-1L, -1L, 0, 0);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index c7deab62460..0aeb49312be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -45,7 +45,6 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -400,11 +399,11 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         });
 
         Position lastCompactedPosition = 
topicCompactionService.getLastCompactedPosition().get();
-        Entry lastCompactedEntry = 
topicCompactionService.readLastCompactedEntry().get();
+        final var lastMessagePosition = 
topicCompactionService.getLastMessagePosition().get();
 
         
Assert.assertTrue(PositionFactory.create(lastCompactedPosition.getLedgerId(),
-                
lastCompactedPosition.getEntryId()).compareTo(lastCompactedEntry.getLedgerId(),
-                lastCompactedEntry.getEntryId()) >= 0);
+                
lastCompactedPosition.getEntryId()).compareTo(lastMessagePosition.ledgerId(),
+                lastMessagePosition.entryId()) >= 0);
 
         future.join();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 252ee939aac..c5ee7a0a776 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -29,6 +29,8 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -39,6 +41,7 @@ import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -57,6 +60,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker-impl")
 public class GetLastMessageIdCompactedTest extends ProducerConsumerBase {
 
@@ -516,4 +520,41 @@ public class GetLastMessageIdCompactedTest extends 
ProducerConsumerBase {
             assertNotEquals(message, null);
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testGetLastMessageIdForEncryptedMessage() throws Exception {
+        final var topic = BrokerTestUtil.newUniqueName("tp");
+        final var ecdsaPublickeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+        final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+        @Cleanup final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic)
+                .batchingMaxBytes(Integer.MAX_VALUE)
+                .batchingMaxMessages(Integer.MAX_VALUE)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublickeyFile)
+                .create();
+        producer.newMessage().key("k0").value("v0").sendAsync();
+        producer.newMessage().key("k0").value("v1").sendAsync();
+        producer.newMessage().key("k1").value("v0").sendAsync();
+        producer.newMessage().key("k1").value(null).sendAsync();
+        producer.flush();
+        triggerCompactionAndWait(topic);
+
+        @Cleanup final var consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName("sub")
+                
.readCompacted(true).defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
+        final var msgId = (MessageIdAdv) consumer.getLastMessageIds().get(0);
+        // Compaction does not work for encrypted messages
+        assertEquals(msgId.getBatchIndex(), 3);
+
+        @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).topic(topic)
+                
.startMessageId(MessageId.earliest).topic(topic).readCompacted(true)
+                .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create();
+        MessageIdAdv readMsgId = (MessageIdAdv) MessageId.earliest;
+        while (reader.hasMessageAvailable()) {
+            final var msg = reader.readNext();
+            log.info("Read key: {}, value: {}", msg.getKey(), 
Optional.ofNullable(msg.getValue()).orElse("(null)"));
+            readMsgId = (MessageIdAdv) msg.getMessageId();
+        }
+        assertEquals(readMsgId, msgId);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
index 8654e595c2a..da4ce33db40 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.compaction;
 import static 
org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.fail;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
@@ -40,11 +39,8 @@ import 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.protocol.Commands;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -173,29 +169,5 @@ public class TopicCompactionServiceTest extends 
MockedPulsarServiceBaseTest {
 
         List<Entry> entries2 = 
service.readCompactedEntries(PositionFactory.EARLIEST, 1).join();
         assertEquals(entries2.size(), 1);
-
-        Entry entry = service.findEntryByEntryIndex(0).join();
-        BrokerEntryMetadata brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
-        assertNotNull(brokerEntryMetadata);
-        assertEquals(brokerEntryMetadata.getIndex(), 2);
-        MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-        assertEquals(metadata.getPartitionKey(), "a");
-        entry.release();
-
-        entry = service.findEntryByEntryIndex(3).join();
-        brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
-        assertNotNull(brokerEntryMetadata);
-        assertEquals(brokerEntryMetadata.getIndex(), 4);
-        metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-        assertEquals(metadata.getPartitionKey(), "b");
-        entry.release();
-
-        entry = service.findEntryByPublishTime(startTime).join();
-        brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer());
-        assertNotNull(brokerEntryMetadata);
-        assertEquals(brokerEntryMetadata.getIndex(), 2);
-        metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-        assertEquals(metadata.getPartitionKey(), "a");
-        entry.release();
     }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index eacec33169e..a03daf05f99 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -163,6 +163,16 @@ message MessageMetadata {
 
     // Indicate if the message partition key is set
     optional bool null_partition_key = 30 [default = false];
+
+    // Indicates the indexes of messages retained in the batch after 
compaction. When a batch is compacted,
+    // some messages may be removed (compacted out). For example, if the 
original batch contains:
+    // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will 
retain only `k0 => v0` and `k2 => v2`.
+    // In this case, this field will be set to `[0, 2]`, and the payload 
buffer will only include the retained messages.
+    //
+    // Note: Batches compacted by older versions of the compaction service do 
not include this field. For such batches,
+    // the `compacted_out` field in `SingleMessageMetadata` must be checked to 
identify and filter out compacted
+    // messages (e.g., `k1 => v1` and `k1 => null` in the example above).
+    repeated int32 compacted_batch_indexes = 31;
 }
 
 message SingleMessageMetadata {


Reply via email to