This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3f80cd5f658 [fix][test] Fix flaky
ReplicatorTest.testResumptionAfterBacklogRelaxed (#24904)
3f80cd5f658 is described below
commit 3f80cd5f6587df457dfa83aca437abb659284c0a
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 28 17:58:41 2025 +0200
[fix][test] Fix flaky ReplicatorTest.testResumptionAfterBacklogRelaxed
(#24904)
---
.../pulsar/broker/service/ReplicatorTest.java | 135 ++++++++++-----------
1 file changed, 64 insertions(+), 71 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 3bcfcee8e2a..6e481c8eaff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -104,6 +104,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -972,6 +973,10 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertTrue(remoteClusters.contains("r1"));
}
+ @DataProvider(name = "retentionPolicies")
+ public static Object[][] retentionPolicies() {
+ return new Object[][] { { RetentionPolicy.producer_exception }, {
RetentionPolicy.producer_request_hold } };
+ }
/**
* Issue #199
@@ -982,91 +987,79 @@ public class ReplicatorTest extends ReplicatorTestBase {
* @throws Exception
*/
- @Test(timeOut = 60000, priority = -1)
- public void testResumptionAfterBacklogRelaxed() throws Exception {
- List<RetentionPolicy> policies = new ArrayList<>();
- policies.add(RetentionPolicy.producer_exception);
- policies.add(RetentionPolicy.producer_request_hold);
-
- for (RetentionPolicy policy : policies) {
- // Use 1Mb quota by default
- admin1.namespaces().setBacklogQuota("pulsar/ns1",
BacklogQuota.builder()
- .limitSize(1 * 1024 * 1024)
- .retentionPolicy(policy)
- .build());
- Thread.sleep(200);
-
- TopicName dest = TopicName
-
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy));
-
- // Producer on r1
- @Cleanup
- MessageProducer producer1 = new MessageProducer(url1, dest);
+ @Test(timeOut = 60000, priority = -1, dataProvider = "retentionPolicies")
+ public void testResumptionAfterBacklogRelaxed(RetentionPolicy policy)
throws Exception {
+ // create a unique namespace for this test case to avoid flakiness
+ String namespace =
newUniqueName("pulsar/testResumptionAfterBacklogRelaxed");
+ Policies policies = new Policies();
+ policies.backlog_quota_map =
Map.of(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder()
+ // Use 1Mb quota by default
+ .limitSize(1 * 1024 * 1024)
+ .retentionPolicy(policy)
+ .build());
+ policies.replication_clusters = Set.of("r1", "r2");
+ admin1.namespaces().createNamespace(namespace, policies);
+
+ TopicName dest = TopicName.get("persistent://" + namespace + "/" +
policy);
- // Consumer on r2
- @Cleanup
- MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+ // Producer on r1
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
- // Replicator for r1 -> r2
- PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService()
- .getTopicReference(dest.toString()).get();
- Replicator replicator = topic.getPersistentReplicator("r2");
+ // Consumer on r2
+ @Cleanup
+ MessageConsumer consumer2 = new MessageConsumer(url2, dest);
- // Produce 1 message in r1. This message will be replicated
immediately into r2 and it will become part of
- // local backlog
- producer1.produce(1);
+ // Replicator for r1 -> r2
+ PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
+ .getTopicReference(dest.toString()).get();
+ Replicator replicator = topic.getPersistentReplicator("r2");
- Thread.sleep(500);
+ // Produce 1 message in r1. This message will be replicated
immediately into r2 and it will become part of
+ // local backlog
+ producer1.produce(1);
- // Restrict backlog quota limit to 1 byte to stop replication
- admin1.namespaces().setBacklogQuota("pulsar/ns1",
BacklogQuota.builder()
- .limitSize(1)
- .retentionPolicy(policy)
- .build());
+ Awaitility.await().untilAsserted(() ->
assertEquals(replicator.computeStats().replicationBacklog, 0));
+ var attributes = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_DOMAIN,
dest.getDomain().value(),
+ OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
+ OpenTelemetryAttributes.PULSAR_NAMESPACE, dest.getNamespace(),
+ OpenTelemetryAttributes.PULSAR_TOPIC,
dest.getPartitionedTopicName(),
+
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
+ );
+ var metrics = metricReader1.collectAllMetrics();
+ assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+ assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
- Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+ // Restrict backlog quota limit to 1 byte to stop replication
+ admin1.namespaces().setBacklogQuota(namespace, BacklogQuota.builder()
+ .limitSize(1)
+ .retentionPolicy(policy)
+ .build());
- assertEquals(replicator.computeStats().replicationBacklog, 0);
- var attributes = Attributes.of(
- OpenTelemetryAttributes.PULSAR_DOMAIN,
dest.getDomain().value(),
- OpenTelemetryAttributes.PULSAR_TENANT, dest.getTenant(),
- OpenTelemetryAttributes.PULSAR_NAMESPACE,
dest.getNamespace(),
- OpenTelemetryAttributes.PULSAR_TOPIC,
dest.getPartitionedTopicName(),
-
OpenTelemetryAttributes.PULSAR_REPLICATION_REMOTE_CLUSTER_NAME, cluster2
- );
- var metrics = metricReader1.collectAllMetrics();
- assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
- assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
+ Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
- // Next message will not be replicated, because r2 has reached the
quota
- producer1.produce(1);
- Thread.sleep(500);
+ // Next message will not be replicated, because r2 has reached the
quota
+ producer1.produce(1);
- assertEquals(replicator.computeStats().replicationBacklog, 1);
- metrics = metricReader1.collectAllMetrics();
- assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1);
- assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
- aDouble -> assertThat(aDouble).isPositive());
+ Awaitility.await().untilAsserted(() ->
assertEquals(replicator.computeStats().replicationBacklog, 1));
- // Consumer will now drain 1 message and the replication backlog
will be cleared
- consumer2.receive(1);
+ metrics = metricReader1.collectAllMetrics();
+ assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 1);
+ assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes,
+ aDouble -> assertThat(aDouble).isPositive());
- // Wait until the 2nd message got delivered to consumer
- consumer2.receive(1);
+ // Consumer will now drain 1 message and the replication backlog will
be cleared
+ consumer2.receive(1);
- int retry = 10;
- for (int i = 0; i < retry &&
replicator.computeStats().replicationBacklog > 0; i++) {
- if (i != retry - 1) {
- Thread.sleep(100);
- }
- }
+ // Wait until the 2nd message got delivered to consumer
+ consumer2.receive(1);
- assertEquals(replicator.computeStats().replicationBacklog, 0);
- metrics = metricReader1.collectAllMetrics();
- assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
- assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
- }
+ Awaitility.await().untilAsserted(() ->
assertEquals(replicator.computeStats().replicationBacklog, 0));
+ metrics = metricReader1.collectAllMetrics();
+ assertMetricLongSumValue(metrics,
OpenTelemetryReplicatorStats.BACKLOG_COUNTER, attributes, 0);
+ assertMetricDoubleGaugeValue(metrics,
OpenTelemetryReplicatorStats.DELAY_GAUGE, attributes, 0.0);
}
/**