nodece commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2126096493
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,131 @@ protected
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}
+
+ protected CompletableFuture<MessageId>
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+ if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "GetMessageIDByIndex is not allowed when broker entry
metadata is disabled"));
+ }
+ if (index == null || index < 0) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ "Invalid message index: " + index));
+ }
+ int partitionIndex = topicName.getPartitionIndex();
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+ return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported
getMessageIdByIndex operation on "
+ + "partitioned-topic
{}", clientAppId(), topicName);
+ throw new
RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageIDByIndex is not
allowed on partitioned-topic");
+ }
+ });
+ }
+ }).thenCompose(ignore ->
validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.error("[{}] Get message id by index on a
non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Get message id by index on a non-persistent
topic is not allowed"));
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ return findMessageIndexByPosition(
+
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+ managedLedger)
+ .thenCompose(firstIndex -> {
+ if (index < firstIndex) {
+ return
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+ } else {
+ return
managedLedger.asyncFindPosition(entry -> {
+ try {
+ Long messageIndex =
getIndexFromEntry(entry);
+ if (messageIndex == null) {
+ return false; // Skip messages
without index
+ } else {
+ // If the message index is
less than the requested index,
+ // we continue searching
+ return messageIndex < index;
+ }
+ } catch (IOException e) {
+ log.error("Error deserialize
message for message position find", e);
+ return false;
+ } finally {
+ entry.release();
+ }
+ });
+ }
+ }).thenCompose(position -> {
+ Position lastPosition =
managedLedger.getLastConfirmedEntry();
+ if (position == null ||
position.compareTo(lastPosition) > 0) {
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND,
+ "Message not found for index " +
index));
+ } else {
+ return
CompletableFuture.completedFuture(position);
+ }
+ });
+ }).thenCompose(position -> CompletableFuture.completedFuture(
+ new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), partitionIndex)));
+ }
+
+ protected CompletableFuture<Long> findMessageIndexByPosition(Position
position, ManagedLedger managedLedger) {
+ CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+ managedLedger.asyncReadEntry(position, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ try {
+ Long index = getIndexFromEntry(entry);
+ if (index == null) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Broker entry metadata is not present in the
message"));
+ } else if (index < 0) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid message index: " + index));
+ } else {
+ indexFuture.complete(index);
+ }
+ } catch (IOException e) {
+ indexFuture.completeExceptionally(new
RestException(Status.INTERNAL_SERVER_ERROR,
+ "Failed to get index from entry: " +
e.getMessage()));
+ } finally {
+ entry.release();
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("[{}] Failed to read position {} on topic {}",
+ clientAppId(), position, topicName, exception);
+ indexFuture.completeExceptionally(exception);
+ }
+ }, null);
+ return indexFuture;
+ }
+
+
+ private static Long getIndexFromEntry(Entry entry) throws IOException {
+ try {
+ final var brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex())
{
+ return brokerEntryMetadata.getIndex();
+ } else {
+ return null;
+ }
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
Review Comment:
Wrapping all `Throwable` into an `IOException` here adds an extra layer
without clear benefit and may obscure the original exception type. Consider
removing the `try-catch` and letting exceptions propagate naturally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]