BewareMyPower commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2115590549
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -5015,5 +5015,38 @@ public void removeAutoSubscriptionCreation(
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")
+ @ApiOperation(hidden = true, value = "Get Message ID by index.",
+ notes = "If the specified index is a system message, "
+ + "it will return the message id of the later message.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or partitioned topic
does not exist, "
+ + "or the index is invalid"),
+ @ApiResponse(code = 406, message = "The topic is not a persistent
topic"),
+ @ApiResponse(code = 412, message = "The broker is not enable
broker entry metadata"),
+ })
+ public void getMessageIDByIndexAndPartitionID(@Suspended final
AsyncResponse asyncResponse,
Review Comment:
The `AndPartitionID` suffix in the method seems meaningless (and confusing)?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,114 @@ protected
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}
+
+ protected CompletableFuture<MessageId>
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+ if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "GetMessageIDByIndex is not allowed when broker entry
metadata is disabled"));
+ }
+ if (index == null || index < 0) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ "Invalid message index: " + index));
+ }
+ int partitionIndex = topicName.getPartitionIndex();
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+ return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported
getMessageIdByIndex operation on "
+ + "partitioned-topic
{}", clientAppId(), topicName);
+ throw new
RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageIDByIndex is not
allowed on partitioned-topic");
+ }
+ });
+ }
+ }).thenCompose(ignore ->
validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.error("[{}] Get message id by index on a
non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Get message id by index on a non-persistent
topic is not allowed"));
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ return findMessageIndexByPosition(
+
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
Review Comment:
Why not just pass `managedLedger.getFirstPosition()` here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5489,4 +5489,114 @@ protected
CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}
+
+ protected CompletableFuture<MessageId>
internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
+ if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "GetMessageIDByIndex is not allowed when broker entry
metadata is disabled"));
+ }
+ if (index == null || index < 0) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ "Invalid message index: " + index));
+ }
+ int partitionIndex = topicName.getPartitionIndex();
+ CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
+ return future.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName,
authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported
getMessageIdByIndex operation on "
+ + "partitioned-topic
{}", clientAppId(), topicName);
+ throw new
RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageIDByIndex is not
allowed on partitioned-topic");
+ }
+ });
+ }
+ }).thenCompose(ignore ->
validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic persistentTopic)) {
+ log.error("[{}] Get message id by index on a
non-persistent topic {} is not allowed",
+ clientAppId(), topicName);
+ return FutureUtil.failedFuture(new
RestException(Status.METHOD_NOT_ALLOWED,
+ "Get message id by index on a non-persistent
topic is not allowed"));
+ }
+ ManagedLedger managedLedger =
persistentTopic.getManagedLedger();
+ return findMessageIndexByPosition(
+
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
+ managedLedger)
+ .thenCompose(firstIndex -> {
+ if (index < firstIndex) {
+ return
CompletableFuture.completedFuture(PositionFactory.EARLIEST);
+ } else {
+ return
managedLedger.asyncFindPosition(entry -> {
+ try {
+ BrokerEntryMetadata
brokerEntryMetadata =
+
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ // Skip messages without index
+ if (brokerEntryMetadata == null) {
+ return true;
+ }
+ return
brokerEntryMetadata.getIndex() < index;
+ } catch (Exception e) {
+ log.error("Error deserialize
message for message position find", e);
+ } finally {
+ entry.release();
+ }
+ return false;
+ });
+ }
+ }).thenCompose(position -> {
+ Position lastPosition =
managedLedger.getLastConfirmedEntry();
+ if (position == null ||
position.compareTo(lastPosition) > 0) {
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_FOUND,
+ "Message not found for index " +
index));
+ } else {
+ return
CompletableFuture.completedFuture(position);
+ }
+ });
+ }).thenCompose(position -> CompletableFuture.completedFuture(
+ new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), partitionIndex)));
+ }
+
+ protected CompletableFuture<Long> findMessageIndexByPosition(Position
position, ManagedLedger managedLedger) {
+ CompletableFuture<Long> indexFuture = new CompletableFuture<>();
+ managedLedger.asyncReadEntry(position, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ BrokerEntryMetadata brokerEntryMetadata =
+
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ if (brokerEntryMetadata == null) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Broker entry metadata is not present in the
message"));
+ } else {
+ long index = brokerEntryMetadata.getIndex();
+ if (index < 0) {
+ indexFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ "Invalid message index: " + index));
+ } else {
+ indexFuture.complete(index);
+ }
+ }
Review Comment:
You must release `entry` here.
I think you can add a common method for both `asyncReadEntry` and
`asyncFindPosition` to use. Here is an example:
```java
private static Optional<Long> getIndexAndRelease(Entry entry) throws
IOException {
try {
final var brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasIndex()) {
return Optional.of(brokerEntryMetadata.getIndex());
} else {
return Optional.empty();
}
} catch (Throwable throwable) {
throw new IOException(throwable);
}
}
```
The implementation above forces the caller side to handle the two
exceptional cases:
- `Optional.empty()` is returned, which indicates the broker entry metadata
does not exist or the index does not exist (your code does not take it into
consideration)
- The buffer is corrupted, where `IOException` will be thrown
But you can implement your own method if you care much about the overhead of
`Optional` or don't want to use checked exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]