BewareMyPower commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2113922366
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,111 @@ protected
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}
+
+ protected CompletableFuture<MessageId>
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+ if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "GetMessageIDByIndex is not allowed when broker entry
metadata is disabled"));
+ }
+ if (index == null || index < 0) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ "Invalid message index: " + index));
+ }
+ int partitionIndex = topicName.getPartitionIndex();
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+ return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported
getMessageIdByIndex operation on "
+ + "partitioned-topic
{}", clientAppId(), topicName);
+ throw new
RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageIDByIndex is not
allowed on partitioned-topic");
+ }
+ });
+ }
+ }).thenCompose(ignore ->
validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.error("[{}] Get message id by index on a
non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Get message id by index on a non-persistent
topic is not allowed"));
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ return findMessageIndexByPosition(
+
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+ managedLedger)
+ .thenCompose(firstIndex -> {
+ if (index < firstIndex) {
+ return
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+ } else {
+ return
managedLedger.asyncFindPosition(entry -> {
+ try {
+ BrokerEntryMetadata
brokerEntryMetadata =
+
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ assert brokerEntryMetadata != null;
Review Comment:
`assert` should not be used here. It should only work in tests via the `-ea`
option.
In production, if `asyncFindPosition` encounters an entry without
`BrokerEntryMetadata`, NPE will be thrown here.
First, many unnecessary error logs will be printed.
Second, returning false does not make sense, it will search to the left
side. If the topic includes entries like: XXXOO, where `X` is the entry without
index and `O` is the entry with index, eventually 0 will be returned.
Here are some solutions to handle this case:
- Fail the search directly if the topic contains entries without broker
entry metadata
- Add an option to control the behavior (return false to left or return true
to right). But this option assumes the entries distribution is either
XX...OO... or OO...XX....For entries like OXOXOX, it does not work. In
addition, an extra option makes this API hard to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]