codelipenghui commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687385573



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() 
throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && 
received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && 
received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws 
Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       The default batching delay is 1ms, it's better to increase the batching 
delay to such as 1min and force flush the producer, because if we are running 
the test on a machine which with leak resource, we might get only 1 message in 
a batch.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,36 @@ public void release() {
         }
     }
 
+    @Override
+    public boolean hasBrokerPublishTime() {
+        return brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp();
+    }
+
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean hasIndex() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+            if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof 
BatchMessageIdImpl) {
+                int batchIndex = ((BatchMessageIdImpl) 
messageId).getBatchIndex();
+                return Optional.of(brokerEntryMetadata.getIndex() - 
batchIndex);

Review comment:
       If the index is 10 and the batch size is 5, we will get 10, 9, 8, 7, 6 
for these 5 messages?

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() 
throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && 
received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && 
received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws 
Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       I think we can send 5 messages and flush them and send 5 messages again, 
to flush again, make sure we got 2 batch message and 10 individual messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to