This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f80cd5f658 [fix][test] Fix flaky 
ReplicatorTest.testResumptionAfterBacklogRelaxed (#24904)
3f80cd5f658 is described below

commit 3f80cd5f6587df457dfa83aca437abb659284c0a
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 28 17:58:41 2025 +0200

    [fix][test] Fix flaky ReplicatorTest.testResumptionAfterBacklogRelaxed 
(#24904)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 135 ++++++++++-----------
 1 file changed, 64 insertions(+), 71 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3bcfcee8e2a..6e481c8eaff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -104,6 +104,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -972,6 +973,10 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertTrue(remoteClusters.contains("r1"));
     }
 
+    @DataProvider(name = "retentionPolicies")
+    public static Object[][] retentionPolicies() {
+        return new Object[][] { { RetentionPolicy.producer_exception }, { 
RetentionPolicy.producer_request_hold } };
+    }
 
     /**
      * Issue #199
@@ -982,91 +987,79 @@ public class ReplicatorTest extends ReplicatorTestBase {
      * @throws Exception
      */
 
-    @Test(timeOut = 60000, priority = -1)
-    public void testResumptionAfterBacklogRelaxed() throws Exception {
-        List<RetentionPolicy> policies = new ArrayList<>();
-        policies.add(RetentionPolicy.producer_exception);
-        policies.add(RetentionPolicy.producer_request_hold);
-
-        for (RetentionPolicy policy : policies) {
-            // Use 1Mb quota by default
-            admin1.namespaces().setBacklogQuota("pulsar/ns1", 
BacklogQuota.builder()
-                    .limitSize(1 * 1024 * 1024)
-                    .retentionPolicy(policy)
-                    .build());
-            Thread.sleep(200);
-
-            TopicName dest = TopicName
-                    
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy));
-
-            // Producer on r1
-            @Cleanup
-            MessageProducer producer1 = new MessageProducer(url1, dest);
+    @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies")
+    public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy) 
throws Exception {
+        // create a unique namespace for this test case to avoid flakiness
+        String namespace = 
newUniqueName("pulsar/testResumptionAfterBacklogRelaxed");
+        Policies policies = new Policies();
+        policies.backlog_quota_map = 
Map.of(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder()
+                // Use 1Mb quota by default
+                .limitSize(1 * 1024 * 1024)
+                .retentionPolicy(policy)
+                .build());
+        policies.replication_clusters = Set.of("r1", "r2");
+        admin1.namespaces().createNamespace(namespace, policies);
+
+        TopicName dest = TopicName.get("persistent://" + namespace + "/" + 
policy);
 
-            // Consumer on r2
-            @Cleanup
-            MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+        // Producer on r1
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
 
-            // Replicator for r1 -> r2
-            PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService()
-                    .getTopicReference(dest.toString()).get();
-            Replicator replicator = topic.getPersistentReplicator("r2");
+        // Consumer on r2
+        @Cleanup
+        MessageConsumer consumer2 = new MessageConsumer(url2, dest);
 
-            // Produce 1 message in r1. This message will be replicated 
immediately into r2 and it will become part of
-            // local backlog
-            producer1.produce(1);
+        // Replicator for r1 -> r2
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopicReference(dest.toString()).get();
+        Replicator replicator = topic.getPersistentReplicator("r2");
 
-            Thread.sleep(500);
+        // Produce 1 message in r1. This message will be replicated 
immediately into r2 and it will become part of
+        // local backlog
+        producer1.produce(1);
 
-            // Restrict backlog quota limit to 1 byte to stop replication
-            admin1.namespaces().setBacklogQuota("pulsar/ns1", 
BacklogQuota.builder()
-                    .limitSize(1)
-                    .retentionPolicy(policy)
-                    .build());
+        Awaitility.await().untilAsserted(() -> 
assertEquals(replicator.computeStats().replicationBacklog, 0));
+        var attributes = Attributes.of(
+                OpenTelemetryAttributes.PULSAR_DOMAIN, 
dest.getDomain().value(),
+                OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, dest.getNamespace(),
+                OpenTelemetryAttributes.PULSAR_TOPIC, 
dest.getPartitionedTopicName(),
+                
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
+        );
+        var metrics = metricReader1.collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+        assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
 
-            Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+        // Restrict backlog quota limit to 1 byte to stop replication
+        admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder()
+                .limitSize(1)
+                .retentionPolicy(policy)
+                .build());
 
-            assertEquals(replicator.computeStats().replicationBacklog, 0);
-            var attributes = Attributes.of(
-                    OpenTelemetryAttributes.PULSAR_DOMAIN, 
dest.getDomain().value(),
-                    OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
-                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
dest.getNamespace(),
-                    OpenTelemetryAttributes.PULSAR_TOPIC, 
dest.getPartitionedTopicName(),
-                    
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
-            );
-            var metrics = metricReader1.collectAllMetrics();
-            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
-            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
 
-            // Next message will not be replicated, because r2 has reached the 
quota
-            producer1.produce(1);
 
-            Thread.sleep(500);
+        // Next message will not be replicated, because r2 has reached the 
quota
+        producer1.produce(1);
 
-            assertEquals(replicator.computeStats().replicationBacklog, 1);
-            metrics = metricReader1.collectAllMetrics();
-            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1);
-            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
-                    aDouble -> assertThat(aDouble).isPositive());
+        Awaitility.await().untilAsserted(() -> 
assertEquals(replicator.computeStats().replicationBacklog, 1));
 
-            // Consumer will now drain 1 message and the replication backlog 
will be cleared
-            consumer2.receive(1);
+        metrics = metricReader1.collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1);
+        assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
+                aDouble -> assertThat(aDouble).isPositive());
 
-            // Wait until the 2nd message got delivered to consumer
-            consumer2.receive(1);
+        // Consumer will now drain 1 message and the replication backlog will 
be cleared
+        consumer2.receive(1);
 
-            int retry = 10;
-            for (int i = 0; i < retry && 
replicator.computeStats().replicationBacklog > 0; i++) {
-                if (i != retry - 1) {
-                    Thread.sleep(100);
-                }
-            }
+        // Wait until the 2nd message got delivered to consumer
+        consumer2.receive(1);
 
-            assertEquals(replicator.computeStats().replicationBacklog, 0);
-            metrics = metricReader1.collectAllMetrics();
-            assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
-            assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
-        }
+        Awaitility.await().untilAsserted(() -> 
assertEquals(replicator.computeStats().replicationBacklog, 0));
+        metrics = metricReader1.collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+        assertMetricDoubleGaugeValue(metrics, 
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
     }
 
     /**

Reply via email to