liangyepianzhou commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2128055455
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java:
##########
@@ -420,4 +426,93 @@ public void testManagedLedgerTotalSize() throws Exception {
cursor.close();
}
+
+ @Test
+ public void testGetMessageIdByIndex() throws Exception {
+ // 1. test no partitioned topic
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ MessageIdImpl messageId = (MessageIdImpl) producer.send("test");
+ Message<byte[]>
+ message =
+ admin.topics().getMessagesById(topicName,
messageId.getLedgerId(), messageId.getEntryId()).get(0);
+ long index = message.getIndex().get();
+ MessageIdImpl messageIdByIndex = (MessageIdImpl)
admin.topics().getMessageIdByIndex(topicName, index);
+ Assert.assertEquals(messageIdByIndex, messageId);
+
+ // 2. test partitioned topic
+ final String topicName2 = newTopicName();
+ final String partitionedTopicName = topicName2 + "-partition-" + 0;
+ admin.topics().createPartitionedTopic(topicName2, 10);
+ @Cleanup
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName2)
+ .enableBatching(false)
+ .create();
+
+ MessageIdImpl messageId2 = null;
+ for (int i = 0; i < 200; i++) {
+ messageId2 = (MessageIdImpl) producer2.send("test" + i);
+ if (messageId2.getPartitionIndex() == 0) {
+ break;
+ }
+ }
+ Message<byte[]>
+ message2 = admin.topics().getMessagesById(partitionedTopicName,
+ messageId2.getLedgerId(), messageId2.getEntryId()).get(0);
+ long index2 = message2.getIndex().get();
+ // 2.1 test partitioned topic name with partition index
+ MessageIdImpl messageIdByIndex2 =
+ (MessageIdImpl)
admin.topics().getMessageIdByIndex(partitionedTopicName, index2);
+ Assert.assertEquals(messageIdByIndex2, messageId2);
+ // 2.2 test partitioned topic name without partition index
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName2, index2),
+ PulsarAdminException.class, NotAllowedException.class);
+
+ // 3. test invalid index
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, -1),
+ PulsarAdminException.class, NotFoundException.class);
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 100000),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ @Test
+ public void testGetMessageIdByIndexForEmptyTopic() throws
PulsarAdminException {
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 0),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ @Test
+ public void testGetMessageIdByIndexOutOfIndex() throws
PulsarAdminException, PulsarClientException {
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+ @Cleanup
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ for (int i = 0; i < 100; i++) {
+ producer.send("msg-" + i);
+ }
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 1000),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ private void assertThrowsWithCause(Executable executable,
Review Comment:
The `run` method of `Runnable` does not throw checked exceptions, while our
code throws `PulsarAdminException` (a checked exception).
Therefore, the code will fail at compile time because we are throwing a
checked exception in the `run` method of `Runnable` (inside a lambda
expression), whereas the `run` method of `Runnable` does not declare any
exceptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]