eolivelli commented on a change in pull request #10790: URL: https://github.com/apache/pulsar/pull/10790#discussion_r645342826
########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java ########## @@ -230,6 +233,94 @@ public void testReplicationSnapshotStopWhenNoTraffic() throws Exception { assertNotEquals(rsc2.getLastCompletedSnapshotId().get(), snapshot2); } + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi1() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api1"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + + TopicStats stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Unload topic in r1 + admin1.topics().unload(topicName); + stats = admin1.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin1.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + stats = admin2.topics().getStats(topicName); + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi2() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api2"; + final String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + admin1.topics().createPartitionedTopic(topicName, 2); + + @Cleanup + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r1 + createReplicatedSubscription(client1, topicName, subName, true); + + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + + // Disable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + // Enable replicated subscription in r1 + admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin1.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } Review comment: We are not testing that actually the replication is starting to work or it is stopping to work. We are only testing on the value of the internal status. Can we some end-to-end test ? otherwise it would be possible that it looks like replication is enabled but actually it is not working ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -177,12 +178,28 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - void setReplicated(boolean replicated) { - this.replicatedSubscriptionSnapshotCache = replicated - ? new ReplicatedSubscriptionSnapshotCache(subName, - topic.getBrokerService().pulsar().getConfiguration() - .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) - : null; + public void setReplicated(boolean replicated) { + ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); + + if (!replicated || !config.isEnableReplicatedSubscriptions()) { + this.replicatedSubscriptionSnapshotCache = null; + } else if (this.replicatedSubscriptionSnapshotCache == null) { + this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + } + + if (this.cursor != null) { + Map<String, Long> properties = this.cursor.getProperties(); Review comment: we should add a method in cursor to set this property. because altering a Map that is returned by a getter method may result in an unpredictable behaviour (it may be a defensive copy) ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -177,12 +178,28 @@ public boolean isReplicated() { return replicatedSubscriptionSnapshotCache != null; } - void setReplicated(boolean replicated) { - this.replicatedSubscriptionSnapshotCache = replicated - ? new ReplicatedSubscriptionSnapshotCache(subName, - topic.getBrokerService().pulsar().getConfiguration() - .getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()) - : null; + public void setReplicated(boolean replicated) { + ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); + + if (!replicated || !config.isEnableReplicatedSubscriptions()) { + this.replicatedSubscriptionSnapshotCache = null; + } else if (this.replicatedSubscriptionSnapshotCache == null) { + this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + } + + if (this.cursor != null) { + Map<String, Long> properties = this.cursor.getProperties(); + try { + if (replicated) { + properties.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } else { + properties.remove(REPLICATED_SUBSCRIPTION_PROPERTY); + } + } catch (UnsupportedOperationException e) { Review comment: this catch block looks like a code smell. UnsupportedOperationException is very generic RuntimeException in Java. we should have a safer way to check for this case `ManagedCursorImpl#lastMarkDeleteEntry has not been initialized yet` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org