massakam commented on a change in pull request #10790: URL: https://github.com/apache/pulsar/pull/10790#discussion_r645684356
########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java ########## @@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception { assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2); } + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi1() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api1"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + + TopicStats stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Unload topic in r1 + admin1.topics().unload(topicName); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi2() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api2"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + admin1.topics().createPartitionedTopic(topicName, 2); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } Review comment: Added some e2e tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org