liangyepianzhou commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2115240448


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,111 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+        if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "GetMessageIDByIndex is not allowed when broker entry 
metadata is disabled"));
+        }
+        if (index == null || index < 0) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    "Invalid message index: " + index));
+        }
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByIndex operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByIndex is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.error("[{}] Get message id by index on a 
non-persistent topic {} is not allowed",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Get message id by index on a non-persistent 
topic is not allowed"));
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    return findMessageIndexByPosition(
+                            
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+                            managedLedger)
+                            .thenCompose(firstIndex -> {
+                                if (index < firstIndex) {
+                                    return 
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+                                } else {
+                                    return 
managedLedger.asyncFindPosition(entry -> {
+                                        try {
+                                            BrokerEntryMetadata 
brokerEntryMetadata =
+                                                    
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+                                            assert brokerEntryMetadata != null;

Review Comment:
   Good catch! Thanks.  
   
   First, we have checked that the broker configuration 
`isBrokerEntryMetadataEnabled = true`, so the entries distribution should be 
**OOOOOO** or **XXXOOO**.  
   
   Second, for the **XXXOOO** scenario, before the lookup completes, we cannot 
determine whether the user-provided index points to **X** or **O**. Directly 
failing the request might not be ideal. For example, if a user's message 
retention time is set to 7 days or even a month, they would be unable to use 
this API for an entire month after enabling `isBrokerEntryMetadataEnabled`, 
even if they want to query the latest message ID by index.  
   
   The example in 
[https://github.com/streamnative/kop/pull/916](https://github.com/streamnative/kop/pull/916)
 is correct—we can skip messages without an index. Additionally, this option 
could be set to `true` by default. As you mentioned, adding an extra option 
would make this API harder to understand.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to