liangyepianzhou commented on code in PR #24222:
URL: https://github.com/apache/pulsar/pull/24222#discussion_r2126361742
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java:
##########
@@ -420,4 +427,109 @@ public void testManagedLedgerTotalSize() throws Exception
{
cursor.close();
}
+
+ @Test
+ public void testGetMessageIdByIndex() throws Exception {
+ // 1. test no partitioned topic
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ MessageIdImpl messageId = (MessageIdImpl) producer.send("test");
+ Message<byte[]>
+ message =
+ admin.topics().getMessagesById(topicName,
messageId.getLedgerId(), messageId.getEntryId()).get(0);
+ long index = message.getIndex().get();
+ MessageIdImpl messageIdByIndex = (MessageIdImpl)
admin.topics().getMessageIdByIndex(topicName, index);
+ Assert.assertEquals(messageIdByIndex, messageId);
+
+ // 2. test partitioned topic
+ final String topicName2 = newTopicName();
+ final String partitionedTopicName = topicName2 + "-partition-" + 0;
+ admin.topics().createPartitionedTopic(topicName2, 10);
+ @Cleanup
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName2)
+ .enableBatching(false)
+ .create();
+
+ MessageIdImpl messageId2 = null;
+ for (int i = 0; i < 200; i++) {
+ messageId2 = (MessageIdImpl) producer2.send("test" + i);
+ if (messageId2.getPartitionIndex() == 0) {
+ break;
+ }
+ }
+ Message<byte[]>
+ message2 = admin.topics().getMessagesById(partitionedTopicName,
+ messageId2.getLedgerId(), messageId2.getEntryId()).get(0);
+ long index2 = message2.getIndex().get();
+ // 2.1 test partitioned topic name with partition index
+ MessageIdImpl messageIdByIndex2 =
+ (MessageIdImpl)
admin.topics().getMessageIdByIndex(partitionedTopicName, index2);
+ Assert.assertEquals(messageIdByIndex2, messageId2);
+ // 2.2 test partitioned topic name without partition index
+ PulsarAdminException e =
Assertions.assertThrows(PulsarAdminException.class, () ->
+ admin.topics().getMessageIdByIndex(topicName2, index2)
+ );
+ assertExceptionChainContains(e, NotAllowedException.class);
+
+ // 3. test invalid index
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, -1),
+ PulsarAdminException.class, NotFoundException.class);
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 100000),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ @Test
+ public void testGetMessageIdByIndexForEmptyTopic() throws
PulsarAdminException {
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 0),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ @Test
+ public void testGetMessageIdByIndexOutOfIndex() throws
PulsarAdminException, PulsarClientException {
+ final String topicName = newTopicName();
+ admin.topics().createNonPartitionedTopic(topicName);
+ @Cleanup
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ for (int i = 0; i < 100; i++) {
+ producer.send("msg-" + i);
+ }
+
+ assertThrowsWithCause(() ->
admin.topics().getMessageIdByIndex(topicName, 1000),
+ PulsarAdminException.class, NotFoundException.class);
+ }
+
+ private void assertExceptionChainContains(Throwable thrown, Class<?
extends Throwable> expectedCauseType) {
+ Throwable cause = thrown;
+ boolean found = false;
+
+ while (cause != null) {
+ if (expectedCauseType.isInstance(cause)) {
+ found = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ Assert.assertTrue(found);
+ }
+
+ private void assertThrowsWithCause(Executable executable,
Review Comment:
While ChatGPT's provided code wasn't directly applicable to our scenario, it
pointed us in the right direction. After testing, confirmed the following
implementation works:
```java
private void assertThrowsWithCause(Executable executable,
Class<? extends Throwable>
expectedException,
Class<? extends Throwable> expectedCause) {
assertThatThrownBy(executable::execute)
.isInstanceOf(expectedException)
.getCause() // First layer: ResponseProcessingException
.getCause() // Second layer: NotFoundException
.isInstanceOf(expectedCause);
}
```
or
```java
private void assertThrowsWithCause(Executable executable,
Class<? extends Throwable>
expectedException,
Class<? extends Throwable> expectedCause)
{
assertThatThrownBy(executable::execute)
.isInstanceOf(expectedException)
.hasRootCauseInstanceOf(expectedCause);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]