This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34c2f30d783 [fix][client] Fix reader message filtering issue during
blue-green cluster switch (#23693)
34c2f30d783 is described below
commit 34c2f30d7838a1d50484985ee8bcfb1d573c50ed
Author: hrzzzz <[email protected]>
AuthorDate: Fri Dec 20 20:05:09 2024 +0800
[fix][client] Fix reader message filtering issue during blue-green cluster
switch (#23693)
Co-authored-by: ruihongzhou <[email protected]>
---
.../broker/service/ClusterMigrationTest.java | 111 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++
2 files changed, 118 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index e6a7d049366..167c154c1fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
@@ -911,6 +912,116 @@ public class ClusterMigrationTest {
client2.close();
}
+ public void testMigrationWithReader() throws Exception {
+ final String topicName = BrokerTestUtil
+ .newUniqueName("persistent://" + namespace +
"/migrationTopic");
+
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder()
+ .serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ // cluster-1 producer/reader
+ Producer<byte[]> producer1 = client1.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("cluster1-1")
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ Reader<byte[]> reader1 =client1.newReader()
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .subscriptionRolePrefix("s1")
+ .create();
+
+ AbstractTopic topic1 = (AbstractTopic)
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
+ retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
+ retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5,
500);
+ assertFalse(topic1.getProducers().isEmpty());
+ assertFalse(topic1.getSubscriptions().isEmpty());
+
+ // build backlog
+ reader1.close();
+ int n = 8;
+ for (int i = 0; i < n; i++) {
+ producer1.send("test1".getBytes());
+ }
+
+ @Cleanup
+ PulsarClient client2 = PulsarClient.builder()
+ .serviceUrl(url2.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ // cluster-2 producer
+ Producer<byte[]> producer2 = client2.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("cluster2-1")
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ AbstractTopic topic2 = (AbstractTopic)
pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
+ assertFalse(topic2.getProducers().isEmpty());
+ assertTrue(topic2.getSubscriptions().isEmpty());
+
+ // migrate topic to cluster-2
+ ClusterUrl migratedUrl = new
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
+ pulsar2.getBrokerServiceUrl(), null);
+ admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
+
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
migratedUrl);
+ retryStrategically((test) -> {
+ try {
+ topic1.checkClusterMigration().get();
+ return true;
+ } catch (Exception e) {
+ // ok
+ }
+ return false;
+ }, 10, 500);
+ topic1.checkClusterMigration().get();
+
+ sleep(1000);
+ producer1.sendAsync("test1".getBytes());
+
+ // producer is disconnected from cluster-1
+ retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
+ assertTrue(topic1.getProducers().isEmpty());
+
+ // producer is connected with cluster-2
+ retryStrategically((test) -> topic2.getProducers().size() == 2, 10,
500);
+ assertEquals(topic2.getProducers().size(), 2);
+
+ // try to consume backlog messages from cluster-1
+ reader1 = client1.newReader()
+ .topic(topicName)
+ .startMessageId(MessageId.earliest)
+ .subscriptionRolePrefix("s1")
+ .create();
+ for (int i = 0; i < n; i++) {
+ Message<byte[]> msg = reader1.readNext();
+ assertEquals(msg.getData(), "test1".getBytes());
+ }
+
+ // after consuming all messages, reader should have disconnected from
cluster-1 and reconnect with cluster-2
+ retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10,
500);
+ assertFalse(topic2.getSubscriptions().isEmpty());
+ assertTrue(topic1.getSubscriptions().isEmpty());
+
+ n = 4;
+ // publish messages to cluster-2 and consume them
+ for (int i = 0; i < n; i++) {
+ producer1.send("test2".getBytes());
+ }
+
+ for (int i = 0; i < n; i++) {
+ assertEquals(reader1.readNext(2, TimeUnit.SECONDS).getData(),
"test2".getBytes());
+ }
+
+ client1.close();
+ client2.close();
+ }
+
+
@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception
{
log.info("--- Starting
ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e01c6d4643b..4d1b51e34db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -3107,6 +3108,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
&&
Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
+ @Override
+ protected void setRedirectedClusterURI(String serviceUrl, String
serviceUrlTls) throws URISyntaxException {
+ super.setRedirectedClusterURI(serviceUrl, serviceUrlTls);
+ acknowledgmentsGroupingTracker.flushAndClean();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ConsumerImpl.class);
@VisibleForTesting