This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8350352152ce7c64cae033b51770ad8756688089 Author: Xiangying Meng <[email protected]> AuthorDate: Mon Jun 23 14:02:09 2025 +0800 [feat][admin] PIP-415: Support getting message ID by index (#24222) PIP:https://github.com/apache/pulsar/pull/24220 we can now obtain the offset of a message by its message id: 1. Get the message by id using `get-message-by-id` cmd 2. Get the index of the message using `Message.getIndex()` But we cannot obtain the message id by offset. Then we need to add a new API to get the message id by offset. Add a new http API to retrieve the message ID by offset. We propose to add a new API to retrieve the message ID by offset, enabling us to cache the mapping between message ID and offset. This will allow us to use offsets for seek and acknowledgment operations when consuming messages through the standardized API. (cherry picked from commit ed28c2166cb68955d00b28345049709196fd7083) --- merged_prs.txt | 6 + .../broker/admin/impl/PersistentTopicsBase.java | 128 +++++++++++++++++++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 33 ++++++ .../broker/service/BrokerEntryMetadataE2ETest.java | 96 ++++++++++++++++ .../org/apache/pulsar/client/admin/Topics.java | 57 +++++++++ .../pulsar/client/admin/internal/TopicsImpl.java | 26 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 18 +++ run.py | 7 ++ 9 files changed, 373 insertions(+), 1 deletion(-) diff --git a/merged_prs.txt b/merged_prs.txt new file mode 100644 index 00000000000..b109e2e1d2f --- /dev/null +++ b/merged_prs.txt @@ -0,0 +1,6 @@ +218 +169 +27 +9 +3 +2 diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a48df69d1d1..27d605f81d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5344,4 +5344,132 @@ public class PersistentTopicsBase extends AdminResource { return null; })); } + + protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long index, boolean authoritative) { + if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "GetMessageIDByIndex is not allowed when broker entry metadata is disabled")); + } + if (index == null || index < 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Invalid message index: " + index)); + } + int partitionIndex = topicName.getPartitionIndex(); + CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); + return future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (topicName.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } else { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenAccept(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported getMessageIdByIndex operation on " + + "partitioned-topic {}", clientAppId(), topicName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "GetMessageIDByIndex is not allowed on partitioned-topic"); + } + }); + } + }).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic persistentTopic)) { + log.error("[{}] Get message id by index on a non-persistent topic {} is not allowed", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED, + "Get message id by index on a non-persistent topic is not allowed")); + } + ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + Position lastPosition = managedLedger.getLastConfirmedEntry(); + Position firstPosition = managedLedger.getFirstPosition(); + if (firstPosition == null || lastPosition == null || firstPosition.equals(lastPosition)) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "No messages found in topic " + topicName)); + } + return findMessageIndexByPosition( + PositionFactory.create(firstPosition.getLedgerId(), 0), + managedLedger) + .thenCompose(firstIndex -> { + if (index < firstIndex) { + return CompletableFuture.completedFuture(PositionFactory.EARLIEST); + } else { + return managedLedger.asyncFindPosition(entry -> { + try { + Long messageIndex = getIndexFromEntry(entry); + if (messageIndex == null) { + return false; // Skip messages without index + } else { + // If the message index is less than the requested index, + // we continue searching + return messageIndex < index; + } + } catch (Throwable e) { + log.error("Error deserialize message for message position find", e); + return false; + } finally { + entry.release(); + } + }); + } + }).thenCompose(position -> { + if (position.compareTo(lastPosition) > 0) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "Message not found for index " + index)); + } else { + return CompletableFuture.completedFuture(position); + } + }); + }).thenCompose(position -> CompletableFuture.completedFuture( + new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex))); + } + + protected CompletableFuture<Long> findMessageIndexByPosition(Position position, ManagedLedger managedLedger) { + CompletableFuture<Long> indexFuture = new CompletableFuture<>(); + managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + Long index = getIndexFromEntry(entry); + if (index == null) { + indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + "Broker entry metadata is not present in the message")); + } else if (index < 0) { + indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + "Invalid message index: " + index)); + } else { + indexFuture.complete(index); + } + } catch (Throwable e) { + indexFuture.completeExceptionally(new RestException(Status.INTERNAL_SERVER_ERROR, + "Failed to get index from entry: " + e.getMessage())); + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Failed to read position {} on topic {}", + clientAppId(), position, topicName, exception); + indexFuture.completeExceptionally(exception); + } + }, null); + return indexFuture; + } + + + private static Long getIndexFromEntry(Entry entry) { + final var brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + return brokerEntryMetadata.getIndex(); + } else { + return null; + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 16b07afd1b7..5ee04d8817f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -5095,5 +5095,38 @@ public class PersistentTopics extends PersistentTopicsBase { }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex") + @ApiOperation(hidden = true, value = "Get Message ID by index.", + notes = "If the specified index is a system message, " + + "it will return the message id of the later message.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace or partitioned topic does not exist, " + + "or the index is invalid"), + @ApiResponse(code = 406, message = "The topic is not a persistent topic"), + @ApiResponse(code = 412, message = "The broker is not enable broker entry metadata"), + }) + public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("index") long index, + @QueryParam("authoritative") @DefaultValue("false") + boolean authoritative){ + validateTopicName(tenant, namespace, encodedTopic); + internalGetMessageIDByIndexAsync(index, authoritative) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get message id by index for topic {}, partition id {}, index {}", + clientAppId(), topicName, index, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index d7dbe34b8fd..4966330c33c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -19,25 +19,32 @@ package org.apache.pulsar.broker.service; import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.ws.rs.NotAllowedException; +import javax.ws.rs.NotFoundException; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.assertj.core.api.ThrowableAssert; import org.assertj.core.util.Sets; import org.awaitility.Awaitility; import org.testng.Assert; @@ -419,4 +426,93 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase { cursor.close(); } + + @Test + public void testGetMessageIdByIndex() throws Exception { + // 1. test no partitioned topic + final String topicName = newTopicName(); + admin.topics().createNonPartitionedTopic(topicName); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + MessageIdImpl messageId = (MessageIdImpl) producer.send("test"); + Message<byte[]> + message = + admin.topics().getMessagesById(topicName, messageId.getLedgerId(), messageId.getEntryId()).get(0); + long index = message.getIndex().get(); + MessageIdImpl messageIdByIndex = (MessageIdImpl) admin.topics().getMessageIdByIndex(topicName, index); + Assert.assertEquals(messageIdByIndex, messageId); + + // 2. test partitioned topic + final String topicName2 = newTopicName(); + final String partitionedTopicName = topicName2 + "-partition-" + 0; + admin.topics().createPartitionedTopic(topicName2, 10); + @Cleanup + Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING) + .topic(topicName2) + .enableBatching(false) + .create(); + + MessageIdImpl messageId2 = null; + for (int i = 0; i < 200; i++) { + messageId2 = (MessageIdImpl) producer2.send("test" + i); + if (messageId2.getPartitionIndex() == 0) { + break; + } + } + Message<byte[]> + message2 = admin.topics().getMessagesById(partitionedTopicName, + messageId2.getLedgerId(), messageId2.getEntryId()).get(0); + long index2 = message2.getIndex().get(); + // 2.1 test partitioned topic name with partition index + MessageIdImpl messageIdByIndex2 = + (MessageIdImpl) admin.topics().getMessageIdByIndex(partitionedTopicName, index2); + Assert.assertEquals(messageIdByIndex2, messageId2); + // 2.2 test partitioned topic name without partition index + assertThrowsWithCause(() -> admin.topics().getMessageIdByIndex(topicName2, index2), + PulsarAdminException.class, NotAllowedException.class); + + // 3. test invalid index + assertThrowsWithCause(() -> admin.topics().getMessageIdByIndex(topicName, -1), + PulsarAdminException.class, NotFoundException.class); + + assertThrowsWithCause(() -> admin.topics().getMessageIdByIndex(topicName, 100000), + PulsarAdminException.class, NotFoundException.class); + } + + @Test + public void testGetMessageIdByIndexForEmptyTopic() throws PulsarAdminException { + final String topicName = newTopicName(); + admin.topics().createNonPartitionedTopic(topicName); + + assertThrowsWithCause(() -> admin.topics().getMessageIdByIndex(topicName, 0), + PulsarAdminException.class, NotFoundException.class); + } + + @Test + public void testGetMessageIdByIndexOutOfIndex() throws PulsarAdminException, PulsarClientException { + final String topicName = newTopicName(); + admin.topics().createNonPartitionedTopic(topicName); + @Cleanup + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + for (int i = 0; i < 100; i++) { + producer.send("msg-" + i); + } + + assertThrowsWithCause(() -> admin.topics().getMessageIdByIndex(topicName, 1000), + PulsarAdminException.class, NotFoundException.class); + } + + private void assertThrowsWithCause(ThrowableAssert.ThrowingCallable executable, + Class<? extends Throwable> expectedException, + Class<? extends Throwable> expectedCause) { + assertThatThrownBy(executable) + .isInstanceOf(expectedException) + .hasRootCauseInstanceOf(expectedCause); + } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index c681bd1a7bc..a2fcd60deb5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -4558,4 +4558,61 @@ public interface Topics { default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) { return createShadowTopicAsync(shadowTopic, sourceTopic, null); } + + /** + * Get the message id by index. If the index points to a system message, return the first user message following it; + * if the specified message has expired and been deleted, return MessageId.Earliest. + * The messages without entry metadata will be skipped, and the next matched message whose index >= the specified + * index will be returned. + * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) + * or the original topic name for non-partitioned topics. + * @param index the index of a message + * @return the message id of the message. + * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal + * storage unit for messages in Pulsar's persistence layer). + * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. + * Example Scenario (partition with 2 entries): + * | Entry | Ledger ID | Entry ID | Index | Messages | + * | :--- | ---: | ---: | ---: | ---: | + * | A | 0 | 0 | 2 | 0,1,2 | + * | B | 0 | 1 | 4 | 3,4 | + * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). + * @throws NotAuthorizedException (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace. + * @throws NotFoundException (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index. + * @throws PulsarAdminException (HTTP 406 Not Acceptable) Specified topic is not a persistent topic. + * @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled. + * @throws PulsarAdminException For other errors (e.g., HTTP 500 Internal Server Error). + */ + MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException; + + + /** + * Get the message id by index asynchronously. If the index points to a system message, return the first user + * message following it; if the specified message has expired and been deleted, return MessageId.Earliest. + * The messages without entry metadata will be skipped, and the next matched message whose index >= the specified + * index will be returned. + * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or + * the original topic name for non-partitioned topics. + * @param index the index of a message + * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal + * storage unit for messages in Pulsar's persistence layer). + * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. + * Example Scenario (partition with 2 entries): + * | Entry | Ledger ID | Entry ID | Index | Messages | + * | :--- | ---: | ---: | ---: | ---: | + * | A | 0 | 0 | 2 | 0,1,2 | + * | B | 0 | 1 | 4 | 3,4 | + * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). + * @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message. + * The future may complete exceptionally with: + * <ul> + * <li>{@link NotAuthorizedException} (HTTP 403) Permission denied for topic/namespace access.</li> + * <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li> + * <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li> + * <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li> + * <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li> + * <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li> + * </ul> + */ + CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9c4a6eef753..8a1e0e0f839 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2819,5 +2819,31 @@ public class TopicsImpl extends BaseResource implements Topics { }); } + @Override + public MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException { + return sync(() -> getMessageIdByIndexAsync(topicName, index)); + } + + @Override + public CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index) { + final CompletableFuture<MessageId> messageIdCompletableFuture = new CompletableFuture<>(); + TopicName topic = validateTopic(topicName); + WebTarget path = topicPath(topic, "getMessageIdByIndex"); + path = path.queryParam("index", index); + asyncGetRequest(path, new InvocationCallback<MessageIdImpl>(){ + + @Override + public void completed(MessageIdImpl messageId) { + messageIdCompletableFuture.complete(messageId); + } + + @Override + public void failed(Throwable throwable) { + messageIdCompletableFuture.completeExceptionally(throwable); + } + }); + return messageIdCompletableFuture; + } + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 44035d396fa..675eca867a3 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -2176,7 +2176,8 @@ public class PulsarAdminToolTest { cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1"); - + cmdTopics.run(split("get-message-id-by-index persistent://myprop/clust/ns1/ds1 -i 0")); + verify(mockTopics).getMessageIdByIndex("persistent://myprop/clust/ns1/ds1", 0); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index ca15e111390..b8ed4c30d87 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -275,6 +275,8 @@ public class CmdTopics extends CmdBase { addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced()); addCommand("trim-topic", new TrimTopic()); + + addCommand("get-message-id-by-index", new GetMessageIdByIndex()); } @Command(description = "Get the list of topics under a namespace.") @@ -3058,4 +3060,20 @@ public class CmdTopics extends CmdBase { getAdmin().topics().trimTopic(topic); } } + + @Command(description = "Get message id by index") + private class GetMessageIdByIndex extends CliCommand { + + @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") + private String topicName; + + @Option(names = { "--index", "-i" }, description = "Index to get message id for the topic", required = true) + private Long index; + + @Override + void run() throws Exception { + String topic = validateTopicName(topicName); + getAdmin().topics().getMessageIdByIndex(topic, index); + } + } } diff --git a/run.py b/run.py new file mode 100755 index 00000000000..38a8f501f73 --- /dev/null +++ b/run.py @@ -0,0 +1,7 @@ +# 影响力指数算法(权重可调) +def calculate_impact(pr): + return (pr['comments']*0.3 + pr['reviews']*0.5 + pr['derived_commits']*1.2) * (1 + 0.1*pr['years_active']) + +# 使用本地git数据替代API +git log --merges --grep "Merge pull request #" --format="%H %s" | awk '{print $5}' | cut -c2- > merged_prs.txt +
