This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 39b69a3cb08 [fix][broker] Fixed the ExtensibleLoadManagerImpl internal 
system getTopic failure when the leadership changes #21764 (#21801)
39b69a3cb08 is described below

commit 39b69a3cb08265542d631bc54a9b66c3ec579df4
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Dec 27 07:47:36 2023 -0800

    [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic 
failure when the leadership changes #21764 (#21801)
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 116 ++++++++++-----------
 .../extensions/store/LoadDataStore.java            |  17 +++
 .../store/TableViewLoadDataStoreImpl.java          |  30 +++++-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  16 +--
 .../extensions/filter/BrokerFilterTestBase.java    |  15 +++
 .../extensions/scheduler/TransferShedderTest.java  |  30 ++++++
 .../extensions/store/LoadDataStoreTest.java        |   3 +
 .../strategy/LeastResourceUsageWithWeightTest.java |  15 +++
 .../loadbalance/ExtensibleLoadManagerTest.java     |  47 ++++++---
 9 files changed, 206 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 37ca29da260..f717286fe5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -279,13 +279,18 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             log.info("Created topic {}.", topic);
         } catch (PulsarAdminException.ConflictException ex) {
             if (debug(pulsar.getConfiguration(), log)) {
-                log.info("Topic {} already exists.", topic, ex);
+                log.info("Topic {} already exists.", topic);
             }
         } catch (PulsarAdminException e) {
             throw new PulsarServerException(e);
         }
     }
 
+    private static void createSystemTopics(PulsarService pulsar) throws 
PulsarServerException {
+        createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
+        createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+    }
+
     @Override
     public void start() throws PulsarServerException {
         if (this.started) {
@@ -321,13 +326,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             this.isolationPoliciesHelper = new 
IsolationPoliciesHelper(policies);
             this.brokerFilterPipeline.add(new 
BrokerIsolationPoliciesFilter(isolationPoliciesHelper));
 
-            createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC);
-            createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
-
             try {
                 this.brokerLoadDataStore = LoadDataStoreFactory
                         .create(pulsar.getClient(), 
BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
-                this.brokerLoadDataStore.startTableView();
                 this.topBundlesLoadDataStore = LoadDataStoreFactory
                         .create(pulsar.getClient(), 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
             } catch (LoadDataStoreException e) {
@@ -382,7 +383,6 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             this.unloadScheduler = new UnloadScheduler(
                     pulsar, pulsar.getLoadManagerExecutor(), unloadManager, 
context,
                     serviceUnitStateChannel, unloadCounter, unloadMetrics);
-            this.unloadScheduler.start();
             this.splitScheduler = new SplitScheduler(
                     pulsar, serviceUnitStateChannel, splitManager, 
splitCounter, splitMetrics, context);
             this.splitScheduler.start();
@@ -740,74 +740,74 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     @VisibleForTesting
     void playLeader() {
-        if (role != Leader) {
-            log.info("This broker:{} is changing the role from {} to {}",
-                    pulsar.getLookupServiceAddress(), role, Leader);
-            int retry = 0;
-            while (true) {
+        log.info("This broker:{} is setting the role from {} to {}",
+                pulsar.getLookupServiceAddress(), role, Leader);
+        int retry = 0;
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                initWaiter.await();
+                // Confirm the system topics have been created or create them 
if they do not exist.
+                // If the leader has changed, the new leader need to reset
+                // the local brokerService.topics (by this topic creations).
+                // Otherwise, the system topic existence check will fail on 
the leader broker.
+                createSystemTopics(pulsar);
+                brokerLoadDataStore.init();
+                topBundlesLoadDataStore.init();
+                unloadScheduler.start();
+                serviceUnitStateChannel.scheduleOwnershipMonitor();
+                break;
+            } catch (Throwable e) {
+                log.error("The broker:{} failed to set the role. Retrying {} 
th ...",
+                        pulsar.getLookupServiceAddress(), ++retry, e);
                 try {
-                    initWaiter.await();
-                    serviceUnitStateChannel.scheduleOwnershipMonitor();
-                    topBundlesLoadDataStore.startTableView();
-                    unloadScheduler.start();
-                    break;
-                } catch (Throwable e) {
-                    log.error("The broker:{} failed to change the role. 
Retrying {} th ...",
-                            pulsar.getLookupServiceAddress(), ++retry, e);
-                    try {
-                        Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
-                    } catch (InterruptedException ex) {
-                        log.warn("Interrupted while sleeping.");
-                    }
+                    Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+                } catch (InterruptedException ex) {
+                    log.warn("Interrupted while sleeping.");
+                    // preserve thread's interrupt status
+                    Thread.currentThread().interrupt();
                 }
             }
-            role = Leader;
-            log.info("This broker:{} plays the leader now.", 
pulsar.getLookupServiceAddress());
         }
+        role = Leader;
+        log.info("This broker:{} plays the leader now.", 
pulsar.getLookupServiceAddress());
 
         // flush the load data when the leader is elected.
-        if (brokerLoadDataReporter != null) {
-            brokerLoadDataReporter.reportAsync(true);
-        }
-        if (topBundleLoadDataReporter != null) {
-            topBundleLoadDataReporter.reportAsync(true);
-        }
+        brokerLoadDataReporter.reportAsync(true);
+        topBundleLoadDataReporter.reportAsync(true);
     }
 
     @VisibleForTesting
     void playFollower() {
-        if (role != Follower) {
-            log.info("This broker:{} is changing the role from {} to {}",
-                    pulsar.getLookupServiceAddress(), role, Follower);
-            int retry = 0;
-            while (true) {
+        log.info("This broker:{} is setting the role from {} to {}",
+                pulsar.getLookupServiceAddress(), role, Follower);
+        int retry = 0;
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                initWaiter.await();
+                unloadScheduler.close();
+                serviceUnitStateChannel.cancelOwnershipMonitor();
+                brokerLoadDataStore.init();
+                topBundlesLoadDataStore.close();
+                topBundlesLoadDataStore.startProducer();
+                break;
+            } catch (Throwable e) {
+                log.error("The broker:{} failed to set the role. Retrying {} 
th ...",
+                        pulsar.getLookupServiceAddress(), ++retry, e);
                 try {
-                    initWaiter.await();
-                    serviceUnitStateChannel.cancelOwnershipMonitor();
-                    topBundlesLoadDataStore.closeTableView();
-                    unloadScheduler.close();
-                    break;
-                } catch (Throwable e) {
-                    log.error("The broker:{} failed to change the role. 
Retrying {} th ...",
-                            pulsar.getLookupServiceAddress(), ++retry, e);
-                    try {
-                        Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
-                    } catch (InterruptedException ex) {
-                        log.warn("Interrupted while sleeping.");
-                    }
+                    Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
+                } catch (InterruptedException ex) {
+                    log.warn("Interrupted while sleeping.");
+                    // preserve thread's interrupt status
+                    Thread.currentThread().interrupt();
                 }
             }
-            role = Follower;
-            log.info("This broker:{} plays a follower now.", 
pulsar.getLookupServiceAddress());
         }
+        role = Follower;
+        log.info("This broker:{} plays a follower now.", 
pulsar.getLookupServiceAddress());
 
         // flush the load data when the leader is elected.
-        if (brokerLoadDataReporter != null) {
-            brokerLoadDataReporter.reportAsync(true);
-        }
-        if (topBundleLoadDataReporter != null) {
-            topBundleLoadDataReporter.reportAsync(true);
-        }
+        brokerLoadDataReporter.reportAsync(true);
+        topBundleLoadDataReporter.reportAsync(true);
     }
 
     public List<Metrics> getMetrics() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
index 680a36523a2..a7deeeaad8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java
@@ -81,9 +81,26 @@ public interface LoadDataStore<T> extends Closeable {
      */
     void closeTableView() throws IOException;
 
+
+    /**
+     * Starts the data store (both producer and table view).
+     */
+    void start() throws LoadDataStoreException;
+
+    /**
+     * Inits the data store (close and start the data store).
+     */
+    void init() throws IOException;
+
     /**
      * Starts the table view.
      */
     void startTableView() throws LoadDataStoreException;
 
+
+    /**
+     * Starts the producer.
+     */
+    void startProducer() throws LoadDataStoreException;
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index a400163ebf1..ead0a7081fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -37,9 +37,9 @@ import org.apache.pulsar.client.api.TableView;
  */
 public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
 
-    private TableView<T> tableView;
+    private volatile TableView<T> tableView;
 
-    private final Producer<T> producer;
+    private volatile Producer<T> producer;
 
     private final PulsarClient client;
 
@@ -50,7 +50,6 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
     public TableViewLoadDataStoreImpl(PulsarClient client, String topic, 
Class<T> clazz) throws LoadDataStoreException {
         try {
             this.client = client;
-            this.producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
             this.topic = topic;
             this.clazz = clazz;
         } catch (Exception e) {
@@ -99,6 +98,12 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
         }
     }
 
+    @Override
+    public void start() throws LoadDataStoreException {
+        startProducer();
+        startTableView();
+    }
+
     @Override
     public void startTableView() throws LoadDataStoreException {
         if (tableView == null) {
@@ -111,14 +116,33 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
         }
     }
 
+    @Override
+    public void startProducer() throws LoadDataStoreException {
+        if (producer == null) {
+            try {
+                producer = 
client.newProducer(Schema.JSON(clazz)).topic(topic).create();
+            } catch (PulsarClientException e) {
+                producer = null;
+                throw new LoadDataStoreException(e);
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (producer != null) {
             producer.close();
+            producer = null;
         }
         closeTableView();
     }
 
+    @Override
+    public void init() throws IOException {
+        close();
+        start();
+    }
+
     private void validateTableViewStart() {
         if (tableView == null) {
             throw new IllegalStateException("table view has not been started");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 20ba9500cb1..545601ea59c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -821,12 +821,12 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true);
 
         if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
-            primaryLoadManager.playFollower();
-            primaryLoadManager.playFollower();
+            primaryLoadManager.playFollower(); // close 3 times
+            primaryLoadManager.playFollower(); // close 1 time
             secondaryLoadManager.playLeader();
             secondaryLoadManager.playLeader();
-            primaryLoadManager.playLeader();
-            primaryLoadManager.playLeader();
+            primaryLoadManager.playLeader(); // close 3 times and open 3 times
+            primaryLoadManager.playLeader(); // close 1 time and open 1 time,
             secondaryLoadManager.playFollower();
             secondaryLoadManager.playFollower();
         } else {
@@ -841,10 +841,10 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         }
 
 
-        verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView();
-        verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView();
-        verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView();
-        verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView();
+        verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView();
+        verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView();
+        verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView();
+        verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView();
 
         FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true);
         FieldUtils.writeDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
index 68bd7b29094..a120ef473e9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java
@@ -90,10 +90,25 @@ public class BrokerFilterTestBase {
 
             }
 
+            @Override
+            public void start() throws LoadDataStoreException {
+
+            }
+
+            @Override
+            public void init() throws IOException {
+
+            }
+
             @Override
             public void startTableView() throws LoadDataStoreException {
 
             }
+
+            @Override
+            public void startProducer() throws LoadDataStoreException {
+
+            }
         };
         configuration.setPreferLaterVersions(true);
         doReturn(configuration).when(mockContext).brokerConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 26d95a0158d..4eec6124777 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -383,10 +383,25 @@ public class TransferShedderTest {
 
             }
 
+            @Override
+            public void start() throws LoadDataStoreException {
+
+            }
+
+            @Override
+            public void init() throws IOException {
+
+            }
+
             @Override
             public void startTableView() throws LoadDataStoreException {
 
             }
+
+            @Override
+            public void startProducer() throws LoadDataStoreException {
+
+            }
         };
 
         var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
@@ -436,10 +451,25 @@ public class TransferShedderTest {
 
             }
 
+            @Override
+            public void start() throws LoadDataStoreException {
+
+            }
+
+            @Override
+            public void init() throws IOException {
+
+            }
+
             @Override
             public void startTableView() throws LoadDataStoreException {
 
             }
+
+            @Override
+            public void startProducer() throws LoadDataStoreException {
+
+            }
         };
 
         BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 184c337a47c..7431b9815f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -75,6 +75,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<MyClass> loadDataStore =
                 LoadDataStoreFactory.create(pulsar.getClient(), topic, 
MyClass.class);
+        loadDataStore.startProducer();
         loadDataStore.startTableView();
         MyClass myClass1 = new MyClass("1", 1);
         loadDataStore.pushAsync("key1", myClass1).get();
@@ -108,6 +109,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         @Cleanup
         LoadDataStore<Integer> loadDataStore =
                 LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+        loadDataStore.startProducer();
         loadDataStore.startTableView();
 
         Map<String, Integer> map = new HashMap<>();
@@ -132,6 +134,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
         LoadDataStore<Integer> loadDataStore =
                 LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+        loadDataStore.startProducer();
 
         loadDataStore.startTableView();
         loadDataStore.pushAsync("1", 1).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index 0eea1d87513..b1e09bf2f3a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -252,10 +252,25 @@ public class LeastResourceUsageWithWeightTest {
 
             }
 
+            @Override
+            public void start() throws LoadDataStoreException {
+
+            }
+
+            @Override
+            public void init() throws IOException {
+
+            }
+
             @Override
             public void startTableView() throws LoadDataStoreException {
 
             }
+
+            @Override
+            public void startProducer() throws LoadDataStoreException {
+
+            }
         };
 
         doReturn(conf).when(ctx).brokerConfiguration();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 49e5ae37834..23abf50bdb0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
@@ -53,6 +54,7 @@ import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -309,23 +311,40 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
         parameters1.put("min_limit", "1");
         parameters1.put("usage_threshold", "100");
 
-        List<String> activeBrokers = admin.brokers().getActiveBrokers();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+                () -> {
+                    List<String> activeBrokers = 
admin.brokers().getActiveBrokers();
+                    assertEquals(activeBrokers.size(), NUM_BROKERS);
+                }
+        );
+        try {
+            admin.namespaces().createNamespace(isolationEnabledNameSpace);
+        } catch (PulsarAdminException.ConflictException e) {
+            //expected when retried
+        }
 
-        assertEquals(activeBrokers.size(), NUM_BROKERS);
+        try {
+            admin.clusters()
+                    .createNamespaceIsolationPolicy(clusterName, 
namespaceIsolationPolicyName, NamespaceIsolationData
+                            .builder()
+                            .namespaces(List.of(isolationEnabledNameSpace))
+                            
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
+                                    
.policyType(AutoFailoverPolicyType.min_available)
+                                    .parameters(parameters1)
+                                    .build())
+                            .primary(List.of(getHostName(0)))
+                            .secondary(List.of(getHostName(1)))
+                            .build());
+        } catch (PulsarAdminException.ConflictException e) {
+            //expected when retried
+        }
 
-        admin.namespaces().createNamespace(isolationEnabledNameSpace);
-        admin.clusters().createNamespaceIsolationPolicy(clusterName, 
namespaceIsolationPolicyName, NamespaceIsolationData
-                .builder()
-                .namespaces(List.of(isolationEnabledNameSpace))
-                .autoFailoverPolicy(AutoFailoverPolicyData.builder()
-                        .policyType(AutoFailoverPolicyType.min_available)
-                        .parameters(parameters1)
-                        .build())
-                .primary(List.of(getHostName(0)))
-                .secondary(List.of(getHostName(1)))
-                .build());
         final String topic = "persistent://" + isolationEnabledNameSpace + 
"/topic";
-        admin.topics().createNonPartitionedTopic(topic);
+        try {
+            admin.topics().createNonPartitionedTopic(topic);
+        } catch (PulsarAdminException.ConflictException e) {
+            //expected when retried
+        }
 
         String broker = admin.lookups().lookupTopic(topic);
 

Reply via email to