This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4366a13ae2f [fix][broker][branch-3.1] Fix broker not starting when 
both transactions and the Extensible Load Manager are enabled (#22194)
4366a13ae2f is described below

commit 4366a13ae2fa96db14dccc0767a81eb3f01af51f
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Tue Mar 5 15:10:32 2024 -0800

    [fix][broker][branch-3.1] Fix broker not starting when both transactions 
and the Extensible Load Manager are enabled (#22194)
---
 .../service/persistent/PersistentSubscription.java |   4 +-
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../ExtensibleLoadManagerImplBaseTest.java         | 160 +++++++++++++++++++++
 .../extensions/ExtensibleLoadManagerImplTest.java  | 123 +---------------
 ...dManagerImplWithTransactionCoordinatorTest.java |  55 +++++++
 .../integration/messaging/MessagingSmokeTest.java  | 107 ++++++++++++++
 .../integration/topologies/PulsarCluster.java      |   2 +-
 .../topologies/PulsarClusterTestBase.java          |  15 ++
 .../src/test/resources/pulsar-messaging.xml        |   1 +
 9 files changed, 350 insertions(+), 121 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index b5852610c20..f00c95f7e68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -58,6 +58,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.service.AbstractSubscription;
 import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -155,7 +156,8 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
         if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !isEventSystemTopic(TopicName.get(topicName))) {
+                && !isEventSystemTopic(TopicName.get(topicName))
+                && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2752f247853..b434d8a2dbc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -83,6 +83,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -306,7 +307,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         TopicName topicName = TopicName.get(topic);
         if 
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
                 && !isEventSystemTopic(topicName)
-                && 
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+                && 
!NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
+                && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
new file mode 100644
index 00000000000..d9c6f78b8d0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class ExtensibleLoadManagerImplBaseTest extends 
MockedPulsarServiceBaseTest {
+
+    protected PulsarService pulsar1;
+    protected PulsarService pulsar2;
+
+    protected PulsarTestContext additionalPulsarTestContext;
+
+    protected ExtensibleLoadManagerImpl primaryLoadManager;
+
+    protected ExtensibleLoadManagerImpl secondaryLoadManager;
+
+    protected ServiceUnitStateChannelImpl channel1;
+    protected ServiceUnitStateChannelImpl channel2;
+
+    protected final String defaultTestNamespace;
+
+    protected LookupService lookupService;
+
+    protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
+        this.defaultTestNamespace = defaultTestNamespace;
+    }
+
+    protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+        conf.setForceDeleteNamespaceAllowed(true);
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        conf.setAllowAutoTopicCreation(true);
+        
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+        conf.setLoadBalancerSheddingEnabled(false);
+        conf.setLoadBalancerDebugModeEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        return conf;
+    }
+
+    @Override
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        initConfig(conf);
+        super.internalSetup(conf);
+        pulsar1 = pulsar;
+        var conf2 = initConfig(getDefaultConf());
+        additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
+        pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+        setPrimaryLoadManager();
+        setSecondaryLoadManager();
+
+        admin.clusters().createCluster(this.conf.getClusterName(),
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+                        Sets.newHashSet(this.conf.getClusterName())));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default",
+                Sets.newHashSet(this.conf.getClusterName()));
+
+        admin.namespaces().createNamespace(defaultTestNamespace, 128);
+        
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+                Sets.newHashSet(this.conf.getClusterName()));
+        lookupService = (LookupService) 
FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        this.additionalPulsarTestContext.close();
+        super.internalCleanup();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    protected void initializeState() throws PulsarAdminException, 
IllegalAccessException {
+        admin.namespaces().unload(defaultTestNamespace);
+        reset(primaryLoadManager, secondaryLoadManager);
+        FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, 
true);
+    }
+
+    protected void setPrimaryLoadManager() throws IllegalAccessException {
+        ExtensibleLoadManagerWrapper wrapper =
+                (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+        primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(wrapper, "loadManager", true));
+        FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, 
true);
+        channel1 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(primaryLoadManager, 
"serviceUnitStateChannel", true);
+    }
+
+    private void setSecondaryLoadManager() throws IllegalAccessException {
+        ExtensibleLoadManagerWrapper wrapper =
+                (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+        secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(wrapper, "loadManager", true));
+        FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, 
true);
+        channel2 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
+    }
+
+    protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
+        return pulsar.getNamespaceService().getBundleAsync(topic);
+    }
+
+    protected Pair<TopicName, NamespaceBundle> 
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
+            throws Exception {
+        TopicName changeEventsTopicName =
+                TopicName.get(defaultTestNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, 
changeEventsTopicName).get();
+        int i = 0;
+        while(true) {
+            TopicName topicName = TopicName.get(defaultTestNamespace + "/" + 
topicNamePrefix + "-" + i);
+            NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+            if (!bundle.equals(changeEventsBundle)) {
+                return Pair.of(topicName, bundle);
+            }
+            i++;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 850bf9a96c8..61621d14de6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -75,7 +75,6 @@ import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -101,21 +100,16 @@ import 
org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
-import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -123,9 +117,6 @@ import 
org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.awaitility.Awaitility;
 import org.mockito.MockedStatic;
 import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
@@ -133,75 +124,11 @@ import org.testng.annotations.Test;
  */
 @Slf4j
 @Test(groups = "broker")
-public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest 
{
+@SuppressWarnings("unchecked")
+public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBaseTest {
 
-    private PulsarService pulsar1;
-    private PulsarService pulsar2;
-
-    private PulsarTestContext additionalPulsarTestContext;
-
-    private ExtensibleLoadManagerImpl primaryLoadManager;
-
-    private ExtensibleLoadManagerImpl secondaryLoadManager;
-
-    private ServiceUnitStateChannelImpl channel1;
-    private ServiceUnitStateChannelImpl channel2;
-
-    private final String defaultTestNamespace = "public/test";
-
-    private static void initConfig(ServiceConfiguration conf){
-        conf.setForceDeleteNamespaceAllowed(true);
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
-        
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-        
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        conf.setLoadBalancerSheddingEnabled(false);
-        conf.setLoadBalancerDebugModeEnabled(true);
-        conf.setTopicLevelPoliciesEnabled(true);
-    }
-
-    @BeforeClass
-    @Override
-    public void setup() throws Exception {
-        // Set the inflight state waiting time and ownership monitor delay 
time to 5 seconds to avoid
-        // stuck when doing unload.
-        initConfig(conf);
-        super.internalSetup(conf);
-        pulsar1 = pulsar;
-        ServiceConfiguration defaultConf = getDefaultConf();
-        initConfig(defaultConf);
-        additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
-        pulsar2 = additionalPulsarTestContext.getPulsarService();
-
-        setPrimaryLoadManager();
-
-        setSecondaryLoadManager();
-
-        admin.clusters().createCluster(this.conf.getClusterName(),
-                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
-                        Sets.newHashSet(this.conf.getClusterName())));
-        admin.namespaces().createNamespace("public/default");
-        admin.namespaces().setNamespaceReplicationClusters("public/default",
-                Sets.newHashSet(this.conf.getClusterName()));
-
-        admin.namespaces().createNamespace(defaultTestNamespace);
-        
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
-                Sets.newHashSet(this.conf.getClusterName()));
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        this.additionalPulsarTestContext.close();
-        super.internalCleanup();
-    }
-
-    @BeforeMethod(alwaysRun = true)
-    protected void initializeState() throws PulsarAdminException {
-        admin.namespaces().unload(defaultTestNamespace);
-        reset(primaryLoadManager, secondaryLoadManager);
+    public ExtensibleLoadManagerImplTest() {
+        super("public/test");
     }
 
     @Test
@@ -459,7 +386,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception 
{
         String namespace = defaultTestNamespace;
         String topic = "persistent://" + namespace + 
"/test-split-with-specific-position";
-        admin.topics().createPartitionedTopic(topic, 10);
+        admin.topics().createPartitionedTopic(topic, 1024);
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         int numBundles = bundles.getNumBundles();
 
@@ -1320,44 +1247,4 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         }
 
     }
-
-    private void setPrimaryLoadManager() throws IllegalAccessException {
-        ExtensibleLoadManagerWrapper wrapper =
-                (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
-        primaryLoadManager = spy((ExtensibleLoadManagerImpl)
-                FieldUtils.readField(wrapper, "loadManager", true));
-        FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, 
true);
-        channel1 = (ServiceUnitStateChannelImpl)
-                FieldUtils.readField(primaryLoadManager, 
"serviceUnitStateChannel", true);
-    }
-
-    private void setSecondaryLoadManager() throws IllegalAccessException {
-        ExtensibleLoadManagerWrapper wrapper =
-                (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
-        secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
-                FieldUtils.readField(wrapper, "loadManager", true));
-        FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, 
true);
-        channel2 = (ServiceUnitStateChannelImpl)
-                FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
-    }
-
-    private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
-        return pulsar.getNamespaceService().getBundleAsync(topic);
-    }
-
-    private Pair<TopicName, NamespaceBundle> 
getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
-            throws Exception {
-        TopicName changeEventsTopicName =
-                TopicName.get(defaultTestNamespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
-        NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, 
changeEventsTopicName).get();
-        int i = 0;
-        while (true) {
-            TopicName topicName = TopicName.get(defaultTestNamespace + "/" + 
topicNamePrefix + "-" + i);
-            NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
-            if (!bundle.equals(changeEventsBundle)) {
-                return Pair.of(topicName, bundle);
-            }
-            i++;
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
new file mode 100644
index 00000000000..0c95dd85f28
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends 
ExtensibleLoadManagerImplBaseTest {
+
+    public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() {
+        super("public/test-elb-with-tx");
+    }
+
+    @Override
+    protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
+        conf = super.initConfig(conf);
+        conf.setTransactionCoordinatorEnabled(true);
+        return conf;
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadAdminAPI() throws Exception {
+        var topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-unload");
+        var topicName = topicAndBundle.getLeft();
+        var bundle = topicAndBundle.getRight();
+
+        var srcBroker = admin.lookups().lookupTopic(topicName.toString());
+        var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ? 
pulsar2 : pulsar1;
+        var dstBrokerUrl = dstBroker.getBrokerId();
+        var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl();
+
+        admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange(), dstBrokerUrl);
+        Awaitility.await().untilAsserted(
+                () -> 
assertEquals(admin.lookups().lookupTopic(topicName.toString()), 
dstBrokerServiceUrl));
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
new file mode 100644
index 00000000000..618053ac000
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.testng.ITest;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+public class MessagingSmokeTest extends TopicMessagingBase implements ITest {
+
+    @Factory
+    public static Object[] messagingTests() {
+        List<?> tests = List.of(
+                new MessagingSmokeTest("Extensible Load Manager",
+                        Map.of("loadManagerClassName", 
ExtensibleLoadManagerImpl.class.getName(),
+                                "loadBalancerLoadSheddingStrategy", 
TransferShedder.class.getName())),
+                new MessagingSmokeTest("Extensible Load Manager with TX 
Coordinator",
+                        Map.of("loadManagerClassName", 
ExtensibleLoadManagerImpl.class.getName(),
+                                "loadBalancerLoadSheddingStrategy", 
TransferShedder.class.getName(),
+                                "transactionCoordinatorEnabled", "true"))
+        );
+        return tests.toArray();
+    }
+
+    private final String name;
+
+    public MessagingSmokeTest(String name, Map<String, String> brokerEnvs) {
+        super();
+        this.brokerEnvs.putAll(brokerEnvs);
+        this.name = name;
+    }
+
+    @Override
+    public String getTestName() {
+        return name;
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testNonPartitionedTopicMessagingWithExclusive(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testPartitionedTopicMessagingWithExclusive(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testNonPartitionedTopicMessagingWithFailover(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testPartitionedTopicMessagingWithFailover(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testNonPartitionedTopicMessagingWithShared(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testPartitionedTopicMessagingWithShared(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testNonPartitionedTopicMessagingWithKeyShared(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+
+    @Test(dataProvider = "serviceUrlAndTopicDomain")
+    public void testPartitionedTopicMessagingWithKeyShared(Supplier<String> 
serviceUrl, TopicDomain topicDomain)
+            throws Exception {
+        partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), 
TopicDomain.persistent.equals(topicDomain));
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index e6a425956cf..8a759faf76b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -133,7 +133,7 @@ public class PulsarCluster {
         this.brokerContainers = Maps.newTreeMap();
         this.workerContainers = Maps.newTreeMap();
 
-        this.proxyContainer = new 
ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, 
spec.enableTls)
+        this.proxyContainer = new ProxyContainer(clusterName, 
appendClusterName(ProxyContainer.NAME), spec.enableTls)
                 .withNetwork(network)
                 .withNetworkAliases(appendClusterName("pulsar-proxy"))
                 .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index ae9e44fa982..93e2221ab24 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.testng.annotations.DataProvider;
 
 import java.util.stream.Stream;
@@ -86,6 +87,20 @@ public abstract class PulsarClusterTestBase extends 
PulsarTestBase {
         };
     }
 
+    @DataProvider
+    public Object[][] serviceUrlAndTopicDomain() {
+        return new Object[][] {
+                {
+                        stringSupplier(() -> 
getPulsarCluster().getPlainTextServiceUrl()),
+                        TopicDomain.persistent
+                },
+                {
+                        stringSupplier(() -> 
getPulsarCluster().getPlainTextServiceUrl()),
+                        TopicDomain.non_persistent
+                },
+        };
+    }
+
     protected PulsarAdmin pulsarAdmin;
 
     protected PulsarCluster pulsarCluster;
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml 
b/tests/integration/src/test/resources/pulsar-messaging.xml
index cfbdb225870..c6cd900d791 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -28,6 +28,7 @@
             <class 
name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
             <class 
name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
             <class 
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
 />
+            <class 
name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
             <class name="org.apache.pulsar.tests.integration.admin.AdminTest" 
/>
         </classes>
     </test>

Reply via email to