This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4399b2743cd [improve] [broker] PIP-356 Support Geo-Replication starts 
at earliest position (#22856)
4399b2743cd is described below

commit 4399b2743cdc01070a871b32f7bc02ac736e9c80
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed Jun 19 22:29:17 2024 +0800

    [improve] [broker] PIP-356 Support Geo-Replication starts at earliest 
position (#22856)
    
    (cherry picked from commit 5fc0eafab9ea2a4ece7b87218404489c270b64e6)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 ++
 .../broker/service/persistent/PersistentTopic.java |   9 +-
 .../broker/service/OneWayReplicatorTest.java       | 103 ++++++++++++++++++++-
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  52 +++++++++++
 4 files changed, 167 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efe1856509..11d1d663f42 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1345,6 +1345,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "Max number of snapshot to be cached per subscription.")
     private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            dynamic = true,
+            doc = "The position that replication task start at, it can be set 
to earliest or latest (default).")
+    private String replicationStartAt = "latest";
+
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3331dcb53b3..dbbb0b07ce3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2062,7 +2062,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
-        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+        final InitialPosition initialPosition;
+        if (MessageId.earliest.toString()
+                
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
 {
+            initialPosition = InitialPosition.Earliest;
+        } else {
+            initialPosition = InitialPosition.Latest;
+        }
+        ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() 
{
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b751d269d1f..5eb1385d85b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -36,6 +37,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.UUID;
@@ -71,11 +73,12 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
@@ -935,4 +938,100 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             });
         }
     }
+
+    protected void enableReplication(String topic) throws Exception {
+        admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, 
cluster2));
+    }
+
+    protected void disableReplication(String topic) throws Exception {
+        admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, 
cluster2));
+    }
+
+    @Test
+    public void testConfigReplicationStartAt() throws Exception {
+        // Initialize.
+        String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        String subscription1 = "s1";
+        admin1.namespaces().createNamespace(ns1);
+        if (!usingGlobalZK) {
+            admin2.namespaces().createNamespace(ns1);
+        }
+
+        RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 
1024);
+        admin1.namespaces().setRetention(ns1, retentionPolicies);
+        admin2.namespaces().setRetention(ns1, retentionPolicies);
+
+        // 1. default config.
+        // Enable replication for topic1.
+        final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp_");
+        admin1.topics().createNonPartitionedTopicAsync(topic1);
+        admin1.topics().createSubscription(topic1, subscription1, 
MessageId.earliest);
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(topic1).create();
+        p1.send("msg-1");
+        p1.close();
+        enableReplication(topic1);
+        // Verify: since the replication was started at latest, there is no 
message to consume.
+        Consumer<String> c1 = 
client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1)
+                .subscribe();
+        Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
+        assertNull(msg1);
+        c1.close();
+        disableReplication(topic1);
+
+        // 2.Update config: start at "earliest".
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
MessageId.earliest.toString());
+        Awaitility.await().untilAsserted(() -> {
+            
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+        });
+
+        final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp_");
+        admin1.topics().createNonPartitionedTopicAsync(topic2);
+        admin1.topics().createSubscription(topic2, subscription1, 
MessageId.earliest);
+        Producer<String> p2 = 
client1.newProducer(Schema.STRING).topic(topic2).create();
+        p2.send("msg-1");
+        p2.close();
+        enableReplication(topic2);
+        // Verify: since the replication was started at earliest, there is one 
message to consume.
+        Consumer<String> c2 = 
client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
+                .subscribe();
+        Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg2);
+        assertEquals(msg2.getValue(), "msg-1");
+        c2.close();
+        disableReplication(topic2);
+
+        // 2.Update config: start at "latest".
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
MessageId.latest.toString());
+        Awaitility.await().untilAsserted(() -> {
+            
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
+        });
+
+        final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp_");
+        admin1.topics().createNonPartitionedTopicAsync(topic3);
+        admin1.topics().createSubscription(topic3, subscription1, 
MessageId.earliest);
+        Producer<String> p3 = 
client1.newProducer(Schema.STRING).topic(topic3).create();
+        p3.send("msg-1");
+        p3.close();
+        enableReplication(topic3);
+        // Verify: since the replication was started at latest, there is no 
message to consume.
+        Consumer<String> c3 = 
client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1)
+                .subscribe();
+        Message<String> msg3 = c3.receive(2, TimeUnit.SECONDS);
+        assertNull(msg3);
+        c3.close();
+        disableReplication(topic3);
+
+        // cleanup.
+        // There is no good way to delete topics when using global ZK, skip 
cleanup.
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, 
Collections.singleton(cluster1));
+        admin1.namespaces().unload(ns1);
+        admin2.namespaces().setNamespaceReplicationClusters(ns1, 
Collections.singleton(cluster2));
+        admin2.namespaces().unload(ns1);
+        admin1.topics().delete(topic1, false);
+        admin2.topics().delete(topic1, false);
+        admin1.topics().delete(topic2, false);
+        admin2.topics().delete(topic2, false);
+        admin1.topics().delete(topic3, false);
+        admin2.topics().delete(topic3, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index b8f8edce247..31e94f435f0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -18,7 +18,19 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -109,4 +121,44 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testReloadWithTopicLevelGeoReplication(ReplicationLevel 
replicationLevel) throws Exception {
         super.testReloadWithTopicLevelGeoReplication(replicationLevel);
     }
+
+    @Test
+    @Override
+    public void testConfigReplicationStartAt() throws Exception {
+        // Initialize.
+        String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        String subscription1 = "s1";
+        admin1.namespaces().createNamespace(ns1);
+        RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 
1024);
+        admin1.namespaces().setRetention(ns1, retentionPolicies);
+        admin2.namespaces().setRetention(ns1, retentionPolicies);
+
+        // Update config: start at "earliest".
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
MessageId.earliest.toString());
+        Awaitility.await().untilAsserted(() -> {
+            
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
+        });
+
+        // Verify: since the replication was started at earliest, there is one 
message to consume.
+        final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + 
ns1 + "/tp_");
+        admin1.topics().createNonPartitionedTopicAsync(topic1);
+        admin1.topics().createSubscription(topic1, subscription1, 
MessageId.earliest);
+        org.apache.pulsar.client.api.Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(topic1).create();
+        p1.send("msg-1");
+        p1.close();
+
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        org.apache.pulsar.client.api.Consumer<String> c1 = 
client2.newConsumer(Schema.STRING).topic(topic1)
+                .subscriptionName(subscription1).subscribe();
+        Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg2);
+        assertEquals(msg2.getValue(), "msg-1");
+        c1.close();
+
+        // cleanup.
+        admin1.brokers().updateDynamicConfiguration("replicationStartAt", 
MessageId.latest.toString());
+        Awaitility.await().untilAsserted(() -> {
+            
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
+        });
+    }
 }

Reply via email to