AuroraTwinkle commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2062805073


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,53 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByOffsetAsync(Long offset, boolean authoritative) {

Review Comment:
   what happen if the index not enabled.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,53 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByOffsetAsync(Long offset, boolean authoritative) {
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByOffset operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByOffset is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+            if (!(topic instanceof PersistentTopic persistentTopic)) {
+                log.error("[{}] Get message id by offset on a non-persistent 
topic {} is not allowed",
+                        clientAppId(), topicName);
+                return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                        "Get message id by offset on a non-persistent topic is 
not allowed"));
+            }
+            ManagedLedger managedLedger = persistentTopic.getManagedLedger();
+            return managedLedger.asyncFindPosition(entry -> {
+                try {
+                    BrokerEntryMetadata brokerEntryMetadata =
+                            
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+                    return brokerEntryMetadata.getIndex() < offset;

Review Comment:
   maybe `brokerEntryMetadata.getIndex() <= offset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to