This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d1225fbb1 [feat] [broker] PIP-188 Fix cluster migration state store 
into local metadatastore  (#21359)
04d1225fbb1 is described below

commit 04d1225fbb1485333f44138e587aadce34ea1f0e
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Oct 24 23:59:09 2023 -0700

    [feat] [broker] PIP-188 Fix cluster migration state store into local 
metadatastore  (#21359)
    
    Co-authored-by: Rajan Dhabalia <rdhaba...@oath.com>
---
 .../pulsar/broker/resources/ClusterResources.java  |  40 ++++++-
 .../pulsar/broker/resources/PulsarResources.java   |   3 +-
 .../pulsar/broker/admin/impl/ClustersBase.java     |  44 ++++++-
 .../pulsar/broker/service/AbstractTopic.java       |  31 +++--
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../broker/service/ClusterMigrationTest.java       | 128 +++++++++------------
 .../org/apache/pulsar/client/admin/Clusters.java   |  43 ++++++-
 .../pulsar/common/policies/data/ClusterData.java   |  26 -----
 .../common/policies/data/ClusterPolicies.java      |  61 ++++++++++
 .../pulsar/client/admin/internal/ClustersImpl.java |  16 ++-
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  13 ++-
 .../common/policies/data/ClusterDataImpl.java      |  31 +----
 .../common/policies/data/ClusterPoliciesImpl.java  |  85 ++++++++++++++
 .../common/policies/data/ClusterDataImplTest.java  |   3 -
 18 files changed, 380 insertions(+), 154 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 843cec7b205..b0cc50edf1f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -39,10 +40,19 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
 
     @Getter
     private FailureDomainResources failureDomainResources;
-
-    public ClusterResources(MetadataStore store, int operationTimeoutSec) {
-        super(store, ClusterData.class, operationTimeoutSec);
-        this.failureDomainResources = new FailureDomainResources(store, 
FailureDomainImpl.class, operationTimeoutSec);
+    @Getter
+    private ClusterPoliciesResources clusterPoliciesResources;
+
+    public ClusterResources(MetadataStore localStore, MetadataStore 
configurationStore, int operationTimeoutSec) {
+        super(configurationStore, ClusterData.class, operationTimeoutSec);
+        this.failureDomainResources = new 
FailureDomainResources(configurationStore, FailureDomainImpl.class,
+                operationTimeoutSec);
+        if (localStore != null) {
+            this.clusterPoliciesResources = new 
ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class,
+                    operationTimeoutSec);
+        } else {
+            this.clusterPoliciesResources = null;
+        }
     }
 
     public CompletableFuture<Set<String>> listAsync() {
@@ -216,4 +226,26 @@ public class ClusterResources extends 
BaseResources<ClusterData> {
             });
         }
     }
+
+    public static class ClusterPoliciesResources extends 
BaseResources<ClusterPoliciesImpl> {
+        public static final String LOCAL_POLICIES_PATH = "policies";
+
+        public ClusterPoliciesResources(MetadataStore store, 
Class<ClusterPoliciesImpl> clazz,
+                int operationTimeoutSec) {
+            super(store, clazz, operationTimeoutSec);
+        }
+
+        public Optional<ClusterPoliciesImpl> getClusterPolicies(String 
clusterName) throws MetadataStoreException {
+            return get(joinPath(BASE_CLUSTERS_PATH, clusterName, 
LOCAL_POLICIES_PATH));
+        }
+
+        public CompletableFuture<Optional<ClusterPoliciesImpl>> 
getClusterPoliciesAsync(String clusterName) {
+            return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, 
LOCAL_POLICIES_PATH));
+        }
+
+        public CompletableFuture<Void> setPoliciesWithCreateAsync(String 
clusterName,
+                Function<Optional<ClusterPoliciesImpl>, ClusterPoliciesImpl> 
createFunction) {
+            return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, 
clusterName, LOCAL_POLICIES_PATH), createFunction);
+        }
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index ad872a5356c..fe7ffe0bc7b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -61,7 +61,8 @@ public class PulsarResources {
             int operationTimeoutSec) {
         if (configurationMetadataStore != null) {
             tenantResources = new TenantResources(configurationMetadataStore, 
operationTimeoutSec);
-            clusterResources = new 
ClusterResources(configurationMetadataStore, operationTimeoutSec);
+            clusterResources = new ClusterResources(localMetadataStore, 
configurationMetadataStore,
+                    operationTimeoutSec);
             namespaceResources = new 
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
             resourcegroupResources = new 
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 5d4ed54c334..b8743933098 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -60,8 +60,10 @@ import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
@@ -247,6 +249,41 @@ public class ClustersBase extends AdminResource {
                 });
     }
 
+    @GET
+    @Path("/{cluster}/migrate")
+    @ApiOperation(
+        value = "Get the cluster migration configuration for the specified 
cluster.",
+        response = ClusterDataImpl.class,
+        notes = "This operation requires Pulsar superuser privileges."
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 200, message = "Return the cluster data.", 
response = ClusterDataImpl.class),
+            @ApiResponse(code = 403, message = "Don't have admin permission."),
+            @ApiResponse(code = 404, message = "Cluster doesn't exist."),
+            @ApiResponse(code = 500, message = "Internal server error.")
+    })
+    public ClusterPolicies getClusterMigration(
+        @ApiParam(
+            value = "The cluster name",
+            required = true
+        )
+        @PathParam("cluster") String cluster
+    ) {
+        validateSuperUserAccess();
+
+        try {
+            return 
clusterResources().getClusterPoliciesResources().getClusterPolicies(cluster)
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Cluster does not exist"));
+        } catch (Exception e) {
+            log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, 
e);
+            if (e instanceof RestException) {
+                throw (RestException) e;
+            } else {
+                throw new RestException(e);
+            }
+        }
+    }
+
     @POST
     @Path("/{cluster}/migrate")
     @ApiOperation(
@@ -286,8 +323,9 @@ public class ClustersBase extends AdminResource {
         }
         validateSuperUserAccessAsync()
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenCompose(__ -> 
clusterResources().updateClusterAsync(cluster, old -> {
-                    ClusterDataImpl data = (ClusterDataImpl) old;
+                .thenCompose(__ -> 
clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster,
+                        old -> {
+                    ClusterPoliciesImpl data = old.orElse(new 
ClusterPoliciesImpl());
                     data.setMigrated(isMigrated);
                     data.setMigratedClusterUrl(clusterUrl);
                     return data;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index a8f25f61a94..7a23312c477 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,7 +65,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
@@ -1358,14 +1358,29 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     }
 
     public static CompletableFuture<Optional<ClusterUrl>> 
getMigratedClusterUrlAsync(PulsarService pulsar,
-                                                                               
      String topic) {
-        return 
pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
+            String topic) {
+        CompletableFuture<Optional<ClusterUrl>> result = new 
CompletableFuture<>();
+        
pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources()
+                .getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
                 .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
-                        ((clusterData, isNamespaceMigrationEnabled)
-                                -> ((clusterData.isPresent() && 
clusterData.get().isMigrated())
-                        || isNamespaceMigrationEnabled)
-                        ? 
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
-                        : Optional.empty()));
+                        ((clusterData, isNamespaceMigrationEnabled) -> {
+                            Optional<ClusterUrl> url = 
((clusterData.isPresent() && clusterData.get().isMigrated())
+                                    || isNamespaceMigrationEnabled)
+                                            ? 
Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
+                                            : Optional.empty();
+                            return url;
+                        }))
+                .thenAccept(res -> {
+                    // cluster policies future is completed by metadata-store 
thread and continuing further
+                    // processing in the same metadata store can cause 
deadlock while creating topic as
+                    // create topic path may have blocking call on 
metadata-store. so, complete future on a
+                    // separate thread to avoid deadlock.
+                    pulsar.getExecutor().execute(() -> result.complete(res));
+                }).exceptionally(ex -> {
+                    pulsar.getExecutor().execute(() -> 
result.completeExceptionally(ex.getCause()));
+                    return null;
+                });
+        return result;
     }
 
     private static CompletableFuture<Boolean> 
isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 023ede74b4f..e72c805d738 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.common.api.proto.KeyLongValue;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index f7d2bb2dd27..acaa7c02d19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -50,7 +50,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 95f139dc11e..da51eaea8fb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -152,7 +152,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 76e9f261ca6..f3857b5ad2a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -73,7 +73,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d35d284d32..a8a921f3b62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,7 +147,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 2139a7bc12e..2fa201cf958 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -86,36 +86,12 @@ public class ClusterMigrationTest {
     PulsarService pulsar4;
     PulsarAdmin admin4;
 
-    @DataProvider(name = "TopicsubscriptionTypes")
-    public Object[][] subscriptionTypes() {
-        return new Object[][] {
-                {true, SubscriptionType.Shared},
-                {true, SubscriptionType.Key_Shared},
-                {true, SubscriptionType.Shared},
-                {true, SubscriptionType.Key_Shared},
-
-                {false, SubscriptionType.Shared},
-                {false, SubscriptionType.Key_Shared},
-                {false, SubscriptionType.Shared},
-                {false, SubscriptionType.Key_Shared},
-        };
-    }
-
     @DataProvider(name="NamespaceMigrationTopicSubscriptionTypes")
     public Object[][] namespaceMigrationSubscriptionTypes() {
         return new Object[][] {
-                {true, SubscriptionType.Shared, true, false},
-                {true, SubscriptionType.Key_Shared, true, false},
-                {true, SubscriptionType.Shared, false, true},
-                {true, SubscriptionType.Key_Shared, false, true},
-                {true, SubscriptionType.Shared, true, true},
-                {true, SubscriptionType.Key_Shared, true, true},
-                {false, SubscriptionType.Shared, true, false},
-                {false, SubscriptionType.Key_Shared, true, false},
-                {false, SubscriptionType.Shared, false, true},
-                {false, SubscriptionType.Key_Shared,false, true},
-                {false, SubscriptionType.Shared, true, true},
-                {false, SubscriptionType.Key_Shared,true, true},
+            {SubscriptionType.Shared, true, false},
+            {SubscriptionType.Shared, false, true},
+            {SubscriptionType.Shared, true, true},
         };
     }
 
@@ -227,14 +203,14 @@ public class ClusterMigrationTest {
     @AfterMethod(alwaysRun = true, timeOut = 300000)
     protected void cleanup() throws Exception {
         log.info("--- Shutting down ---");
-        broker1.cleanup();
         admin1.close();
-        broker2.cleanup();
         admin2.close();
-        broker3.cleanup();
         admin3.close();
-        broker4.cleanup();
         admin4.close();
+        broker1.cleanup();
+        broker2.cleanup();
+        broker3.cleanup();
+        broker4.cleanup();
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -259,11 +235,11 @@ public class ClusterMigrationTest {
      * (11) Restart Broker-1 and connect producer/consumer on cluster-1
      * @throws Exception
      */
-    @Test(dataProvider = "TopicsubscriptionTypes")
-    public void testClusterMigration(boolean persistent, SubscriptionType 
subType) throws Exception {
+    @Test
+    public void testClusterMigration() throws Exception {
         log.info("--- Starting ReplicatorTest::testClusterMigration ---");
         final String topicName = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
 
         @Cleanup
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
@@ -271,7 +247,7 @@ public class ClusterMigrationTest {
         // cluster-1 producer/consumer
         Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).enableBatching(false)
                 
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
-        Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s1").subscribe();
         AbstractTopic topic1 = (AbstractTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
         retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
@@ -298,6 +274,7 @@ public class ClusterMigrationTest {
         ClusterUrl migratedUrl = new 
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
                 pulsar2.getBrokerServiceUrl(), 
pulsar2.getBrokerServiceUrlTls());
         admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
+        
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
 migratedUrl);
 
         retryStrategically((test) -> {
             try {
@@ -330,12 +307,10 @@ public class ClusterMigrationTest {
 
         // try to consume backlog messages from cluster-1
         consumer1 = 
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
-        if (persistent) {
-            for (int i = 0; i < n; i++) {
-                Message<byte[]> msg = consumer1.receive();
-                assertEquals(msg.getData(), "test1".getBytes());
-                consumer1.acknowledge(msg);
-            }
+        for (int i = 0; i < n; i++) {
+            Message<byte[]> msg = consumer1.receive();
+            assertEquals(msg.getData(), "test1".getBytes());
+            consumer1.acknowledge(msg);
         }
         // after consuming all messages, consumer should have disconnected
         // from cluster-1 and reconnect with cluster-2
@@ -351,13 +326,13 @@ public class ClusterMigrationTest {
         assertTrue(topic1.getSubscriptions().isEmpty());
 
         // not also create a new consumer which should also reconnect to 
cluster-2
-        Consumer<byte[]> consumer2 = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumer2 = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s2").subscribe();
         retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 
500);
         assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());
 
         // new sub on migration topic must be redirected immediately
-        Consumer<byte[]> consumerM = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumerM = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("sM").subscribe();
         
assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers()
                 .isEmpty());
@@ -365,7 +340,7 @@ public class ClusterMigrationTest {
 
         // migrate topic after creating subscription
         String newTopicName = topicName + "-new";
-        consumerM = 
client1.newConsumer().topic(newTopicName).subscriptionType(subType)
+        consumerM = 
client1.newConsumer().topic(newTopicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("sM").subscribe();
         retryStrategically((t) -> 
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
         
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
@@ -392,8 +367,8 @@ public class ClusterMigrationTest {
 
         // create non-migrated topic which should connect to cluster-1
         String diffTopic = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
-        Consumer<byte[]> consumerDiff = 
client1.newConsumer().topic(diffTopic).subscriptionType(subType)
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
+        Consumer<byte[]> consumerDiff = 
client1.newConsumer().topic(diffTopic).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s1-d").subscribe();
         Producer<byte[]> producerDiff = 
client1.newProducer().topic(diffTopic).enableBatching(false)
                 
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
@@ -408,7 +383,7 @@ public class ClusterMigrationTest {
         broker1.restart();
         Producer<byte[]> producer4 = 
client1.newProducer().topic(topicName).enableBatching(false)
                 
.producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
-        Consumer<byte[]> consumer3 = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumer3 = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s3").subscribe();
         retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 
500);
         assertTrue(topic2.getProducers().size() == 4);
@@ -421,15 +396,16 @@ public class ClusterMigrationTest {
             assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), 
"test3".getBytes());
         }
 
+        client1.close();
+        client2.close();
         log.info("Successfully consumed messages by migrated consumers");
     }
 
-    @Test(dataProvider = "TopicsubscriptionTypes")
-    public void testClusterMigrationWithReplicationBacklog(boolean persistent, 
SubscriptionType subType) throws Exception {
+    @Test
+    public void testClusterMigrationWithReplicationBacklog() throws Exception {
         log.info("--- Starting 
ReplicatorTest::testClusterMigrationWithReplicationBacklog ---");
-        persistent = true;
         final String topicName = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
 
         @Cleanup
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
@@ -440,11 +416,11 @@ public class ClusterMigrationTest {
         // cluster-1 producer/consumer
         Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).enableBatching(false)
                 
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
-        Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumer1 = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s1").subscribe();
 
         // cluster-3 consumer
-        Consumer<byte[]> consumer3 = 
client3.newConsumer().topic(topicName).subscriptionType(subType)
+        Consumer<byte[]> consumer3 = 
client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                 .subscriptionName("s1").subscribe();
         AbstractTopic topic1 = (AbstractTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
         retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
@@ -526,6 +502,10 @@ public class ClusterMigrationTest {
         // verify that the producer1 is now is now connected to migrated 
cluster "r2" since backlog is cleared.
         retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500);
         assertEquals(topic2.getProducers().size(), 2);
+
+        client1.close();
+        client2.close();
+        client3.close();
     }
 
     /**
@@ -638,17 +618,19 @@ public class ClusterMigrationTest {
             }
         }, 10, 500);
         
assertFalse(pulsar2Topic.getSubscription("s1").getConsumers().isEmpty());
+
+        client1.close();
     }
 
     @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
-    public void testNamespaceMigration(boolean persistent, SubscriptionType 
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception 
{
+    public void testNamespaceMigration(SubscriptionType subType, boolean 
isClusterMigrate, boolean isNamespaceMigrate) throws Exception {
         log.info("--- Starting Test::testNamespaceMigration ---");
         // topic for the namespace1 (to be migrated)
         final String topicName = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
         // topic for namespace2 (not to be migrated)
         final String topicName2 = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespaceNotToMigrate + "/migrationTopic");
+                .newUniqueName("persistent://" + namespaceNotToMigrate + 
"/migrationTopic");
 
         @Cleanup
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
@@ -764,16 +746,14 @@ public class ClusterMigrationTest {
         // try to consume backlog messages from cluster-1
         blueConsumerNs1_1 = 
client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
         blueConsumerNs2_1 = 
client1.newConsumer().topic(topicName2).subscriptionName("s1").subscribe();
-        if (persistent) {
-            for (int i = 0; i < n; i++) {
-                Message<byte[]> msg = blueConsumerNs1_1.receive();
-                assertEquals(msg.getData(), "test1".getBytes());
-                blueConsumerNs1_1.acknowledge(msg);
-
-                Message<byte[]> msg2 = blueConsumerNs2_1.receive();
-                assertEquals(msg2.getData(), "test1".getBytes());
-                blueConsumerNs2_1.acknowledge(msg2);
-            }
+        for (int i = 0; i < n; i++) {
+            Message<byte[]> msg = blueConsumerNs1_1.receive();
+            assertEquals(msg.getData(), "test1".getBytes());
+            blueConsumerNs1_1.acknowledge(msg);
+
+            Message<byte[]> msg2 = blueConsumerNs2_1.receive();
+            assertEquals(msg2.getData(), "test1".getBytes());
+            blueConsumerNs2_1.acknowledge(msg2);
         }
         // after consuming all messages, consumer should have disconnected
         // from blue and reconnect with green
@@ -862,7 +842,7 @@ public class ClusterMigrationTest {
 
         // create non-migrated topic which should connect to blue
         String diffTopic = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
         Consumer<byte[]> consumerDiff = 
client1.newConsumer().topic(diffTopic).subscriptionType(subType)
                 .subscriptionName("s1-d").subscribe();
         Producer<byte[]> producerDiff = 
client1.newProducer().topic(diffTopic).enableBatching(false)
@@ -902,18 +882,19 @@ public class ClusterMigrationTest {
         blueProducerNs2_1.close();
         greenProducerNs1_1.close();
         greenProducerNs2_1.close();
+        client1.close();
+        client2.close();
     }
 
     @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
-    public void testNamespaceMigrationWithReplicationBacklog(boolean 
persistent, SubscriptionType subType, boolean isClusterMigrate, boolean 
isNamespaceMigrate) throws Exception {
+    public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType 
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception 
{
         log.info("--- Starting 
ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
-        persistent = true;
         // topic for namespace1 (to be migrated)
         final String topicName = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespace + "/migrationTopic");
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
         // topic for namespace2 (not to be migrated)
         final String topicName2 = BrokerTestUtil
-                .newUniqueName((persistent ? "persistent" : "non-persistent") 
+ "://" + namespaceNotToMigrate + "/migrationTopic");
+                .newUniqueName("persistent://" + namespaceNotToMigrate + 
"/migrationTopic");
 
         @Cleanup
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
@@ -1069,6 +1050,9 @@ public class ClusterMigrationTest {
         blueConsumerNs2_1.close();
         greenProducerNs1_1.close();
         greenProducerNs2_1.close();
+        client1.close();
+        client2.close();
+        client3.close();
     }
 
     static class TestBroker extends MockedPulsarServiceBaseTest {
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 4178bc7483d..53e66809465 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -29,7 +29,8 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
@@ -209,6 +210,46 @@ public interface Clusters {
      */
     CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, 
LinkedHashSet<String> peerClusterNames);
 
+    /**
+     * Get the cluster migration configuration data for the specified cluster.
+     * <p/>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
+     * </pre>
+     *
+     * @param cluster
+     *            Cluster name
+     *
+     * @return the cluster configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of 
the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    ClusterPolicies getClusterMigration(String cluster) throws 
PulsarAdminException;
+
+    /**
+     * Get the cluster migration configuration data for the specified cluster 
asynchronously.
+     * <p/>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
+     * </pre>
+     *
+     * @param cluster
+     *            Cluster name
+     *
+     * @return the cluster configuration
+     *
+     */
+    CompletableFuture<ClusterPolicies> getClusterMigrationAsync(String 
cluster);
+
     /**
      * Update the configuration for a cluster migration.
      * <p/>
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
index 0b3e5aa49cb..1f7126521c6 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java
@@ -19,9 +19,6 @@
 package org.apache.pulsar.common.policies.data;
 
 import java.util.LinkedHashSet;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.admin.utils.ReflectionUtils;
 import org.apache.pulsar.client.api.ProxyProtocol;
 
@@ -70,10 +67,6 @@ public interface ClusterData {
 
     String getListenerName();
 
-    boolean isMigrated();
-
-    ClusterUrl getMigratedClusterUrl();
-
     interface Builder {
         Builder serviceUrl(String serviceUrl);
 
@@ -119,10 +112,6 @@ public interface ClusterData {
 
         Builder listenerName(String listenerName);
 
-        Builder migrated(boolean migrated);
-
-        Builder migratedClusterUrl(ClusterUrl migratedClusterUrl);
-
         ClusterData build();
     }
 
@@ -131,19 +120,4 @@ public interface ClusterData {
     static Builder builder() {
         return 
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterDataImpl");
     }
-
-    @Data
-    @NoArgsConstructor
-    @AllArgsConstructor
-    class ClusterUrl {
-        String serviceUrl;
-        String serviceUrlTls;
-        String brokerServiceUrl;
-        String brokerServiceUrlTls;
-
-        public boolean isEmpty() {
-            return serviceUrl != null && serviceUrlTls != null && 
brokerServiceUrl == null
-                    && brokerServiceUrlTls == null;
-        }
-    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
new file mode 100644
index 00000000000..b95f6bb19ce
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.pulsar.client.admin.utils.ReflectionUtils;
+
+public interface ClusterPolicies {
+    boolean isMigrated();
+
+    ClusterUrl getMigratedClusterUrl();
+
+    interface Builder {
+        Builder migrated(boolean migrated);
+
+        Builder migratedClusterUrl(ClusterUrl migratedClusterUrl);
+
+        ClusterPolicies build();
+    }
+
+    Builder clone();
+
+    static Builder builder() {
+        return 
ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterPoliciesImpl");
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @EqualsAndHashCode
+    class ClusterUrl {
+        String serviceUrl;
+        String serviceUrlTls;
+        String brokerServiceUrl;
+        String brokerServiceUrlTls;
+
+        public boolean isEmpty() {
+            return serviceUrl != null && serviceUrlTls != null && 
brokerServiceUrl == null
+                    && brokerServiceUrlTls == null;
+        }
+    }
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 02e44aca626..231d4506d61 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -34,8 +34,10 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
@@ -107,6 +109,18 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
         return asyncPostRequest(path, Entity.entity(peerClusterNames, 
MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public ClusterPolicies getClusterMigration(String cluster) throws 
PulsarAdminException {
+        return sync(() -> getClusterMigrationAsync(cluster));
+    }
+
+    @Override
+    public CompletableFuture<ClusterPolicies> getClusterMigrationAsync(String 
cluster) {
+        WebTarget path = adminClusters.path(cluster).path("migrate");
+        return asyncGetRequest(path, new FutureCallback<ClusterPoliciesImpl>() 
{
+        }).thenApply(policies -> policies);
+    }
+
     @Override
     public void updateClusterMigration(String cluster, boolean isMigrated, 
ClusterUrl clusterUrl)
             throws PulsarAdminException {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 033146aa607..646f4ef0f50 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -32,8 +32,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 
@@ -154,6 +154,17 @@ public class CmdClusters extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the cluster migration configuration 
data for the specified cluster")
+    private class GetClusterMigration extends CliCommand {
+        @Parameter(description = "cluster-name", required = true)
+        private java.util.List<String> params;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            print(getAdmin().clusters().getClusterMigration(cluster));
+        }
+    }
+
     @Parameters(commandDescription = "Update cluster migration")
     private class UpdateClusterMigration extends CliCommand {
         @Parameter(description = "cluster-name", required = true)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
index 6a7110e6507..fffe87a3005 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
@@ -176,17 +176,6 @@ public final class ClusterDataImpl implements  
ClusterData, Cloneable {
             example = ""
     )
     private String listenerName;
-    @ApiModelProperty(
-            name = "migrated",
-            value = "flag to check if cluster is migrated to different 
cluster",
-            example = "true/false"
-    )
-    private boolean migrated;
-    @ApiModelProperty(
-            name = "migratedClusterUrl",
-            value = "url of cluster where current cluster is migrated"
-    )
-    private ClusterUrl migratedClusterUrl;
 
     public static ClusterDataImplBuilder builder() {
         return new ClusterDataImplBuilder();
@@ -216,9 +205,7 @@ public final class ClusterDataImpl implements  ClusterData, 
Cloneable {
                 .brokerClientTrustCertsFilePath(brokerClientTrustCertsFilePath)
                 
.brokerClientCertificateFilePath(brokerClientCertificateFilePath)
                 .brokerClientKeyFilePath(brokerClientKeyFilePath)
-                .listenerName(listenerName)
-                .migrated(migrated)
-                .migratedClusterUrl(migratedClusterUrl);
+                .listenerName(listenerName);
     }
 
     @Data
@@ -245,8 +232,6 @@ public final class ClusterDataImpl implements  ClusterData, 
Cloneable {
         private String brokerClientKeyFilePath;
         private String brokerClientTrustCertsFilePath;
         private String listenerName;
-        private boolean migrated;
-        private ClusterUrl migratedClusterUrl;
 
         ClusterDataImplBuilder() {
         }
@@ -367,16 +352,6 @@ public final class ClusterDataImpl implements  
ClusterData, Cloneable {
             return this;
         }
 
-        public ClusterDataImplBuilder migrated(boolean migrated) {
-            this.migrated = migrated;
-            return this;
-        }
-
-        public ClusterDataImplBuilder migratedClusterUrl(ClusterUrl 
migratedClusterUrl) {
-            this.migratedClusterUrl = migratedClusterUrl;
-            return this;
-        }
-
         public ClusterDataImpl build() {
             return new ClusterDataImpl(
                     serviceUrl,
@@ -400,9 +375,7 @@ public final class ClusterDataImpl implements  ClusterData, 
Cloneable {
                     brokerClientTrustCertsFilePath,
                     brokerClientKeyFilePath,
                     brokerClientCertificateFilePath,
-                    listenerName,
-                    migrated,
-                    migratedClusterUrl);
+                    listenerName);
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
new file mode 100644
index 00000000000..c8af2dec321
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * The configuration data for a cluster.
+ */
+@ApiModel(
+        value = "ClusterPolicies",
+        description = "The local cluster policies for a cluster"
+)
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public final class ClusterPoliciesImpl implements  ClusterPolicies, Cloneable {
+    @ApiModelProperty(
+            name = "migrated",
+            value = "flag to check if cluster is migrated to different 
cluster",
+            example = "true/false"
+    )
+    private boolean migrated;
+    @ApiModelProperty(
+            name = "migratedClusterUrl",
+            value = "url of cluster where current cluster is migrated"
+    )
+    private ClusterUrl migratedClusterUrl;
+
+    public static ClusterPoliciesImplBuilder builder() {
+        return new ClusterPoliciesImplBuilder();
+    }
+
+    @Override
+    public ClusterPoliciesImplBuilder clone() {
+        return builder()
+                .migrated(migrated)
+                .migratedClusterUrl(migratedClusterUrl);
+    }
+
+    @Data
+    public static class ClusterPoliciesImplBuilder implements 
ClusterPolicies.Builder {
+        private boolean migrated;
+        private ClusterUrl migratedClusterUrl;
+
+        ClusterPoliciesImplBuilder() {
+        }
+
+        public ClusterPoliciesImplBuilder migrated(boolean migrated) {
+            this.migrated = migrated;
+            return this;
+        }
+
+        public ClusterPoliciesImplBuilder migratedClusterUrl(ClusterUrl 
migratedClusterUrl) {
+            this.migratedClusterUrl = migratedClusterUrl;
+            return this;
+        }
+
+        public ClusterPoliciesImpl build() {
+            return new ClusterPoliciesImpl(
+                    migrated,
+                    migratedClusterUrl);
+        }
+    }
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
index 87e935ecf73..0bf16166531 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java
@@ -53,9 +53,6 @@ public class ClusterDataImplTest {
                 .brokerClientKeyFilePath("/my/key/file")
                 .brokerClientCertificateFilePath("/my/cert/file")
                 .listenerName("a-listener")
-                .migrated(true)
-                .migratedClusterUrl(new 
ClusterData.ClusterUrl("http://remote";, "https://remote";, "pulsar://remote",
-                        "pulsar+ssl://remote"))
                 .build();
 
         ClusterDataImpl clone = originalData.clone().build();

Reply via email to