BewareMyPower commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2115590549


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -5015,5 +5015,38 @@ public void removeAutoSubscriptionCreation(
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")
+    @ApiOperation(hidden = true, value = "Get Message ID by index.",
+            notes = "If the specified index is a system message, "
+                    + "it will return the message id of the later message.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace or partitioned topic 
does not exist, "
+                    + "or the index is invalid"),
+            @ApiResponse(code = 406, message = "The topic is not a persistent 
topic"),
+            @ApiResponse(code = 412, message = "The broker is not enable 
broker entry metadata"),
+    })
+    public void getMessageIDByIndexAndPartitionID(@Suspended final 
AsyncResponse asyncResponse,

Review Comment:
   The `AndPartitionID` suffix in the method seems meaningless (and confusing)?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,114 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+        if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "GetMessageIDByIndex is not allowed when broker entry 
metadata is disabled"));
+        }
+        if (index == null || index < 0) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    "Invalid message index: " + index));
+        }
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByIndex operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByIndex is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.error("[{}] Get message id by index on a 
non-persistent topic {} is not allowed",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Get message id by index on a non-persistent 
topic is not allowed"));
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    return findMessageIndexByPosition(
+                            
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),

Review Comment:
   Why not just pass `managedLedger.getFirstPosition()` here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,114 @@ protected 
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
                             return null;
                         }));
     }
+
+    protected CompletableFuture<MessageId> 
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+        if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                    "GetMessageIDByIndex is not allowed when broker entry 
metadata is disabled"));
+        }
+        if (index == null || index < 0) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    "Invalid message index: " + index));
+        }
+        int partitionIndex = topicName.getPartitionIndex();
+        CompletableFuture<Void> future = 
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+        return future.thenCompose(__ -> {
+                    if (topicName.isGlobal()) {
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> {
+                    if (topicName.isPartitioned()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+                                .thenAccept(topicMetadata -> {
+                                    if (topicMetadata.partitions > 0) {
+                                        log.warn("[{}] Not supported 
getMessageIdByIndex operation on "
+                                                        + "partitioned-topic 
{}", clientAppId(), topicName);
+                                        throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                                "GetMessageIDByIndex is not 
allowed on partitioned-topic");
+                                    }
+                                });
+                    }
+                }).thenCompose(ignore -> 
validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic persistentTopic)) {
+                        log.error("[{}] Get message id by index on a 
non-persistent topic {} is not allowed",
+                                clientAppId(), topicName);
+                        return FutureUtil.failedFuture(new 
RestException(Status.METHOD_NOT_ALLOWED,
+                                "Get message id by index on a non-persistent 
topic is not allowed"));
+                    }
+                    ManagedLedger managedLedger = 
persistentTopic.getManagedLedger();
+                    return findMessageIndexByPosition(
+                            
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+                            managedLedger)
+                            .thenCompose(firstIndex -> {
+                                if (index < firstIndex) {
+                                    return 
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+                                } else {
+                                    return 
managedLedger.asyncFindPosition(entry -> {
+                                        try {
+                                            BrokerEntryMetadata 
brokerEntryMetadata =
+                                                    
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+                                            // Skip messages without index
+                                            if (brokerEntryMetadata == null) {
+                                                return true;
+                                            }
+                                            return 
brokerEntryMetadata.getIndex() < index;
+                                        } catch (Exception e) {
+                                            log.error("Error deserialize 
message for message position find", e);
+                                        } finally {
+                                            entry.release();
+                                        }
+                                        return false;
+                                    });
+                                }
+                            }).thenCompose(position -> {
+                                Position lastPosition = 
managedLedger.getLastConfirmedEntry();
+                                if (position == null || 
position.compareTo(lastPosition) > 0) {
+                                    return FutureUtil.failedFuture(new 
RestException(Status.NOT_FOUND,
+                                            "Message not found for index " + 
index));
+                                } else {
+                                    return 
CompletableFuture.completedFuture(position);
+                                }
+                            });
+                }).thenCompose(position -> CompletableFuture.completedFuture(
+                        new MessageIdImpl(position.getLedgerId(), 
position.getEntryId(), partitionIndex)));
+    }
+
+    protected CompletableFuture<Long> findMessageIndexByPosition(Position 
position, ManagedLedger managedLedger) {
+        CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+        managedLedger.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                BrokerEntryMetadata brokerEntryMetadata =
+                        
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+                if (brokerEntryMetadata == null) {
+                    indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                            "Broker entry metadata is not present in the 
message"));
+                } else {
+                    long index = brokerEntryMetadata.getIndex();
+                    if (index < 0) {
+                        indexFuture.completeExceptionally(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Invalid message index: " + index));
+                    } else {
+                        indexFuture.complete(index);
+                    }
+                }

Review Comment:
   You must release `entry` here.
   
   I think you can add a common method for both `asyncReadEntry` and 
`asyncFindPosition` to use. Here is an example:
   
   ```java
       private static Optional<Long> getIndexAndRelease(Entry entry) throws 
IOException {
           try {
               final var brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
               if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
                   return Optional.of(brokerEntryMetadata.getIndex());
               } else {
                   return Optional.empty();
               }
           } catch (Throwable throwable) {
               throw new IOException(throwable);
           }
       }
   ```
   
   The implementation above forces the caller side to handle the two 
exceptional cases:
   - `Optional.empty()` is returned, which indicates the broker entry metadata 
does not exist or the index does not exist (your code does not take it into 
consideration)
   - The buffer is corrupted, where `IOException` will be thrown
   
   But you can implement your own method if you care much about the overhead of 
`Optional` or don't want to use checked exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to