This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c9fba5f1f7e [fix][admin] Report earliest msg in partitioned backlog 
(#19465)
c9fba5f1f7e is described below

commit c9fba5f1f7edc6c22219d27af1b7a63ab2d17c44
Author: Elliot West <[email protected]>
AuthorDate: Thu Jun 15 04:43:24 2023 +0100

    [fix][admin] Report earliest msg in partitioned backlog (#19465)
    
    Co-authored-by: tison <[email protected]>
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 16 ++++-
 .../org/apache/pulsar/client/admin/Topics.java     |  2 +
 .../policies/data/stats/SubscriptionStatsImpl.java | 12 ++++
 .../common/policies/data/stats/TopicStatsImpl.java | 12 ++++
 .../data/stats/SubscriptionStatsImplTest.java      | 82 ++++++++++++++++++++++
 .../policies/data/stats/TopicStatsImplTest.java    | 81 +++++++++++++++++++++
 6 files changed, 204 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 26449b69edd..c97544eb5aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static org.mockito.Mockito.spy;
@@ -36,6 +37,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1895,6 +1897,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
             .subscribe();
 
+        long start1 = 0;
+        long start2 = 0;
         @Cleanup
         Producer<byte[]> producer = client.newProducer()
             .topic(topic)
@@ -1902,6 +1906,12 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             .create();
 
         for (int i = 0; i < 10; i++) {
+            if (i == 0) {
+                start1 = Clock.systemUTC().millis();
+            }
+            if (i == 5) {
+                start2 = Clock.systemUTC().millis();
+            }
             if (i > 4) {
                 producer.newMessage()
                     .value("message-1".getBytes(StandardCharsets.UTF_8))
@@ -1912,22 +1922,26 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             }
         }
         // wait until the message add to delay queue.
+        long finalStart1 = start1;
         Awaitility.await().untilAsserted(() -> {
             TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false, true, true, true);
             
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
             
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
             
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 5);
+            
assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog()
 >= finalStart1);
         });
 
         for (int i = 0; i < 5; i++) {
             consumer.acknowledge(consumer.receive());
         }
         // Wait the ack send.
-        Awaitility.await().untilAsserted(() -> {
+        long finalStart2 = start2;
+        Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> {
             TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, 
false, true, true, true);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
+            
assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog()
 >= finalStart2);
         });
 
     }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index f599e2566bf..156d67e4e58 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1361,6 +1361,8 @@ public interface Topics {
      *            Set to true to get precise backlog, Otherwise get imprecise 
backlog.
      * @param subscriptionBacklogSize
      *            Whether to get backlog size for each subscription.
+     * @param getEarliestTimeInBacklog
+     *            Whether to get the earliest time in backlog.
      * @return a future that can be used to track when the partitioned topic 
statistics are returned
      */
     CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index cfc6cab9e11..bed8aabf27d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -177,6 +177,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         consumersAfterMarkDeletePosition.clear();
         nonContiguousDeletedMessagesRanges = 0;
         nonContiguousDeletedMessagesRangesSerializedSize = 0;
+        earliestMsgPublishTimeInBacklog = 0L;
         delayedMessageIndexSizeInBytes = 0;
         subscriptionProperties.clear();
         filterProcessedMsgCount = 0;
@@ -221,6 +222,17 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
         
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
         this.nonContiguousDeletedMessagesRanges += 
stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += 
stats.nonContiguousDeletedMessagesRangesSerializedSize;
+        if (this.earliestMsgPublishTimeInBacklog != 0 && 
stats.earliestMsgPublishTimeInBacklog != 0) {
+            this.earliestMsgPublishTimeInBacklog = Math.min(
+                    this.earliestMsgPublishTimeInBacklog,
+                    stats.earliestMsgPublishTimeInBacklog
+            );
+        } else {
+            this.earliestMsgPublishTimeInBacklog = Math.max(
+                    this.earliestMsgPublishTimeInBacklog,
+                    stats.earliestMsgPublishTimeInBacklog
+            );
+        }
         this.delayedMessageIndexSizeInBytes += 
stats.delayedMessageIndexSizeInBytes;
         this.subscriptionProperties.putAll(stats.subscriptionProperties);
         this.filterProcessedMsgCount += stats.filterProcessedMsgCount;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 7a48df89b8b..e50620fb223 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -216,6 +216,7 @@ public class TopicStatsImpl implements TopicStats {
         this.lastOffloadFailureTimeStamp = 0;
         this.lastOffloadSuccessTimeStamp = 0;
         this.publishRateLimitedTimes = 0L;
+        this.earliestMsgPublishTimeInBacklogs = 0L;
         this.delayedMessageIndexSizeInBytes = 0;
         this.compaction.reset();
         this.ownerBroker = null;
@@ -315,6 +316,17 @@ public class TopicStatsImpl implements TopicStats {
                 }
             }
         }
+        if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) 
ts).earliestMsgPublishTimeInBacklogs != 0) {
+            earliestMsgPublishTimeInBacklogs = Math.min(
+                    earliestMsgPublishTimeInBacklogs,
+                    ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
+            );
+        } else {
+            earliestMsgPublishTimeInBacklogs = Math.max(
+                    earliestMsgPublishTimeInBacklogs,
+                    ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
+            );
+        }
         return this;
     }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
new file mode 100644
index 00000000000..8a4b5da9edd
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class SubscriptionStatsImplTest {
+
+    @Test
+    public void testReset() {
+        SubscriptionStatsImpl stats = new SubscriptionStatsImpl();
+        stats.earliestMsgPublishTimeInBacklog = 1L;
+        stats.reset();
+        assertEquals(stats.earliestMsgPublishTimeInBacklog, 0L);
+
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
+        SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklog = 10L;
+
+        SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklog = 20L;
+
+        SubscriptionStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
+        SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklog = 0L;
+
+        SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklog = 20L;
+
+        SubscriptionStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 20L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
+        SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklog = 10L;
+
+        SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklog = 0L;
+
+        SubscriptionStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
+        SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklog = 0L;
+
+        SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklog = 0L;
+
+        SubscriptionStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 0L);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
new file mode 100644
index 00000000000..09cef4c4d0f
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class TopicStatsImplTest {
+
+    @Test
+    public void testReset() {
+        TopicStatsImpl stats = new TopicStatsImpl();
+        stats.earliestMsgPublishTimeInBacklogs = 1L;
+        stats.reset();
+        assertEquals(stats.earliestMsgPublishTimeInBacklogs, 0L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
+        TopicStatsImpl stats1 = new TopicStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklogs = 10L;
+
+        TopicStatsImpl stats2 = new TopicStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklogs = 20L;
+
+        TopicStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
+        TopicStatsImpl stats1 = new TopicStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklogs = 0L;
+
+        TopicStatsImpl stats2 = new TopicStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklogs = 20L;
+
+        TopicStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 20L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
+        TopicStatsImpl stats1 = new TopicStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklogs = 10L;
+
+        TopicStatsImpl stats2 = new TopicStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklogs = 0L;
+
+        TopicStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
+    }
+
+    @Test
+    public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
+        TopicStatsImpl stats1 = new TopicStatsImpl();
+        stats1.earliestMsgPublishTimeInBacklogs = 0L;
+
+        TopicStatsImpl stats2 = new TopicStatsImpl();
+        stats2.earliestMsgPublishTimeInBacklogs = 0L;
+
+        TopicStatsImpl aggregate = stats1.add(stats2);
+        assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 0L);
+    }
+}
\ No newline at end of file

Reply via email to