This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 39b69a3cb08 [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic failure when the leadership changes #21764 (#21801) 39b69a3cb08 is described below commit 39b69a3cb08265542d631bc54a9b66c3ec579df4 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Wed Dec 27 07:47:36 2023 -0800 [fix][broker] Fixed the ExtensibleLoadManagerImpl internal system getTopic failure when the leadership changes #21764 (#21801) --- .../extensions/ExtensibleLoadManagerImpl.java | 116 ++++++++++----------- .../extensions/store/LoadDataStore.java | 17 +++ .../store/TableViewLoadDataStoreImpl.java | 30 +++++- .../extensions/ExtensibleLoadManagerImplTest.java | 16 +-- .../extensions/filter/BrokerFilterTestBase.java | 15 +++ .../extensions/scheduler/TransferShedderTest.java | 30 ++++++ .../extensions/store/LoadDataStoreTest.java | 3 + .../strategy/LeastResourceUsageWithWeightTest.java | 15 +++ .../loadbalance/ExtensibleLoadManagerTest.java | 47 ++++++--- 9 files changed, 206 insertions(+), 83 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 37ca29da260..f717286fe5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -279,13 +279,18 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { log.info("Created topic {}.", topic); } catch (PulsarAdminException.ConflictException ex) { if (debug(pulsar.getConfiguration(), log)) { - log.info("Topic {} already exists.", topic, ex); + log.info("Topic {} already exists.", topic); } } catch (PulsarAdminException e) { throw new PulsarServerException(e); } } + private static void createSystemTopics(PulsarService pulsar) throws PulsarServerException { + createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); + createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); + } + @Override public void start() throws PulsarServerException { if (this.started) { @@ -321,13 +326,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - createSystemTopic(pulsar, BROKER_LOAD_DATA_STORE_TOPIC); - createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); - try { this.brokerLoadDataStore = LoadDataStoreFactory .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.brokerLoadDataStore.startTableView(); this.topBundlesLoadDataStore = LoadDataStoreFactory .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); } catch (LoadDataStoreException e) { @@ -382,7 +383,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { this.unloadScheduler = new UnloadScheduler( pulsar, pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel, unloadCounter, unloadMetrics); - this.unloadScheduler.start(); this.splitScheduler = new SplitScheduler( pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context); this.splitScheduler.start(); @@ -740,74 +740,74 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @VisibleForTesting void playLeader() { - if (role != Leader) { - log.info("This broker:{} is changing the role from {} to {}", - pulsar.getLookupServiceAddress(), role, Leader); - int retry = 0; - while (true) { + log.info("This broker:{} is setting the role from {} to {}", + pulsar.getLookupServiceAddress(), role, Leader); + int retry = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + initWaiter.await(); + // Confirm the system topics have been created or create them if they do not exist. + // If the leader has changed, the new leader need to reset + // the local brokerService.topics (by this topic creations). + // Otherwise, the system topic existence check will fail on the leader broker. + createSystemTopics(pulsar); + brokerLoadDataStore.init(); + topBundlesLoadDataStore.init(); + unloadScheduler.start(); + serviceUnitStateChannel.scheduleOwnershipMonitor(); + break; + } catch (Throwable e) { + log.error("The broker:{} failed to set the role. Retrying {} th ...", + pulsar.getLookupServiceAddress(), ++retry, e); try { - initWaiter.await(); - serviceUnitStateChannel.scheduleOwnershipMonitor(); - topBundlesLoadDataStore.startTableView(); - unloadScheduler.start(); - break; - } catch (Throwable e) { - log.error("The broker:{} failed to change the role. Retrying {} th ...", - pulsar.getLookupServiceAddress(), ++retry, e); - try { - Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - } + Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); + // preserve thread's interrupt status + Thread.currentThread().interrupt(); } } - role = Leader; - log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress()); } + role = Leader; + log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress()); // flush the load data when the leader is elected. - if (brokerLoadDataReporter != null) { - brokerLoadDataReporter.reportAsync(true); - } - if (topBundleLoadDataReporter != null) { - topBundleLoadDataReporter.reportAsync(true); - } + brokerLoadDataReporter.reportAsync(true); + topBundleLoadDataReporter.reportAsync(true); } @VisibleForTesting void playFollower() { - if (role != Follower) { - log.info("This broker:{} is changing the role from {} to {}", - pulsar.getLookupServiceAddress(), role, Follower); - int retry = 0; - while (true) { + log.info("This broker:{} is setting the role from {} to {}", + pulsar.getLookupServiceAddress(), role, Follower); + int retry = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + initWaiter.await(); + unloadScheduler.close(); + serviceUnitStateChannel.cancelOwnershipMonitor(); + brokerLoadDataStore.init(); + topBundlesLoadDataStore.close(); + topBundlesLoadDataStore.startProducer(); + break; + } catch (Throwable e) { + log.error("The broker:{} failed to set the role. Retrying {} th ...", + pulsar.getLookupServiceAddress(), ++retry, e); try { - initWaiter.await(); - serviceUnitStateChannel.cancelOwnershipMonitor(); - topBundlesLoadDataStore.closeTableView(); - unloadScheduler.close(); - break; - } catch (Throwable e) { - log.error("The broker:{} failed to change the role. Retrying {} th ...", - pulsar.getLookupServiceAddress(), ++retry, e); - try { - Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - } + Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); + // preserve thread's interrupt status + Thread.currentThread().interrupt(); } } - role = Follower; - log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress()); } + role = Follower; + log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress()); // flush the load data when the leader is elected. - if (brokerLoadDataReporter != null) { - brokerLoadDataReporter.reportAsync(true); - } - if (topBundleLoadDataReporter != null) { - topBundleLoadDataReporter.reportAsync(true); - } + brokerLoadDataReporter.reportAsync(true); + topBundleLoadDataReporter.reportAsync(true); } public List<Metrics> getMetrics() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java index 680a36523a2..a7deeeaad8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java @@ -81,9 +81,26 @@ public interface LoadDataStore<T> extends Closeable { */ void closeTableView() throws IOException; + + /** + * Starts the data store (both producer and table view). + */ + void start() throws LoadDataStoreException; + + /** + * Inits the data store (close and start the data store). + */ + void init() throws IOException; + /** * Starts the table view. */ void startTableView() throws LoadDataStoreException; + + /** + * Starts the producer. + */ + void startProducer() throws LoadDataStoreException; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index a400163ebf1..ead0a7081fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -37,9 +37,9 @@ import org.apache.pulsar.client.api.TableView; */ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { - private TableView<T> tableView; + private volatile TableView<T> tableView; - private final Producer<T> producer; + private volatile Producer<T> producer; private final PulsarClient client; @@ -50,7 +50,6 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class<T> clazz) throws LoadDataStoreException { try { this.client = client; - this.producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); this.topic = topic; this.clazz = clazz; } catch (Exception e) { @@ -99,6 +98,12 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { } } + @Override + public void start() throws LoadDataStoreException { + startProducer(); + startTableView(); + } + @Override public void startTableView() throws LoadDataStoreException { if (tableView == null) { @@ -111,14 +116,33 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> { } } + @Override + public void startProducer() throws LoadDataStoreException { + if (producer == null) { + try { + producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create(); + } catch (PulsarClientException e) { + producer = null; + throw new LoadDataStoreException(e); + } + } + } + @Override public void close() throws IOException { if (producer != null) { producer.close(); + producer = null; } closeTableView(); } + @Override + public void init() throws IOException { + close(); + start(); + } + private void validateTableViewStart() { if (tableView == null) { throw new IllegalStateException("table view has not been started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 20ba9500cb1..545601ea59c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -821,12 +821,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondarySpy, true); if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) { - primaryLoadManager.playFollower(); - primaryLoadManager.playFollower(); + primaryLoadManager.playFollower(); // close 3 times + primaryLoadManager.playFollower(); // close 1 time secondaryLoadManager.playLeader(); secondaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); - primaryLoadManager.playLeader(); + primaryLoadManager.playLeader(); // close 3 times and open 3 times + primaryLoadManager.playLeader(); // close 1 time and open 1 time, secondaryLoadManager.playFollower(); secondaryLoadManager.playFollower(); } else { @@ -841,10 +841,10 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { } - verify(topBundlesLoadDataStorePrimarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStorePrimarySpy, times(3)).closeTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).startTableView(); - verify(topBundlesLoadDataStoreSecondarySpy, times(3)).closeTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(4)).startTableView(); + verify(topBundlesLoadDataStorePrimarySpy, times(8)).closeTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(4)).startTableView(); + verify(topBundlesLoadDataStoreSecondarySpy, times(8)).closeTableView(); FieldUtils.writeDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStorePrimary, true); FieldUtils.writeDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", topBundlesLoadDataStoreSecondary, true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index 68bd7b29094..a120ef473e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -90,10 +90,25 @@ public class BrokerFilterTestBase { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; configuration.setPreferLaterVersions(true); doReturn(configuration).when(mockContext).brokerConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 26d95a0158d..4eec6124777 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -383,10 +383,25 @@ public class TransferShedderTest { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() { @@ -436,10 +451,25 @@ public class TransferShedderTest { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; BrokerRegistry brokerRegistry = mock(BrokerRegistry.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java index 184c337a47c..7431b9815f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java @@ -75,6 +75,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest { @Cleanup LoadDataStore<MyClass> loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, MyClass.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); MyClass myClass1 = new MyClass("1", 1); loadDataStore.pushAsync("key1", myClass1).get(); @@ -108,6 +109,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest { @Cleanup LoadDataStore<Integer> loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); Map<String, Integer> map = new HashMap<>(); @@ -132,6 +134,7 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); LoadDataStore<Integer> loadDataStore = LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + loadDataStore.startProducer(); loadDataStore.startTableView(); loadDataStore.pushAsync("1", 1).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index 0eea1d87513..b1e09bf2f3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -252,10 +252,25 @@ public class LeastResourceUsageWithWeightTest { } + @Override + public void start() throws LoadDataStoreException { + + } + + @Override + public void init() throws IOException { + + } + @Override public void startTableView() throws LoadDataStoreException { } + + @Override + public void startProducer() throws LoadDataStoreException { + + } }; doReturn(conf).when(ctx).brokerConfiguration(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 49e5ae37834..23abf50bdb0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; @@ -53,6 +54,7 @@ import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -309,23 +311,40 @@ public class ExtensibleLoadManagerTest extends TestRetrySupport { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - List<String> activeBrokers = admin.brokers().getActiveBrokers(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + List<String> activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), NUM_BROKERS); + } + ); + try { + admin.namespaces().createNamespace(isolationEnabledNameSpace); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } - assertEquals(activeBrokers.size(), NUM_BROKERS); + try { + admin.clusters() + .createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData + .builder() + .namespaces(List.of(isolationEnabledNameSpace)) + .autoFailoverPolicy(AutoFailoverPolicyData.builder() + .policyType(AutoFailoverPolicyType.min_available) + .parameters(parameters1) + .build()) + .primary(List.of(getHostName(0))) + .secondary(List.of(getHostName(1))) + .build()); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } - admin.namespaces().createNamespace(isolationEnabledNameSpace); - admin.clusters().createNamespaceIsolationPolicy(clusterName, namespaceIsolationPolicyName, NamespaceIsolationData - .builder() - .namespaces(List.of(isolationEnabledNameSpace)) - .autoFailoverPolicy(AutoFailoverPolicyData.builder() - .policyType(AutoFailoverPolicyType.min_available) - .parameters(parameters1) - .build()) - .primary(List.of(getHostName(0))) - .secondary(List.of(getHostName(1))) - .build()); final String topic = "persistent://" + isolationEnabledNameSpace + "/topic"; - admin.topics().createNonPartitionedTopic(topic); + try { + admin.topics().createNonPartitionedTopic(topic); + } catch (PulsarAdminException.ConflictException e) { + //expected when retried + } String broker = admin.lookups().lookupTopic(topic);