This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cb703ca  Optimize Tests (#12560)
cb703ca is described below

commit cb703cab9b58bc7fe2da5694e2404538c44dab7a
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Mon Nov 1 23:36:38 2021 +0800

    Optimize Tests (#12560)
---
 .../TopicTransactionBufferRecoverTest.java         | 36 +----------
 .../TransactionClientReconnectTest.java            | 31 +---------
 .../broker/transaction/TransactionProduceTest.java | 42 ++-----------
 .../pulsar/broker/transaction/TransactionTest.java | 26 +-------
 .../broker/transaction/TransactionTestBase.java    | 69 ++++++++++++++++------
 .../buffer/TransactionLowWaterMarkTest.java        | 39 +-----------
 .../buffer/TransactionStablePositionTest.java      | 26 +-------
 .../TransactionMetaStoreAssignmentTest.java        | 14 +----
 .../pendingack/PendingAckInMemoryDeleteTest.java   | 43 +-------------
 .../pendingack/PendingAckPersistentTest.java       | 35 ++---------
 .../client/impl/TransactionEndToEndTest.java       | 38 +-----------
 11 files changed, 72 insertions(+), 327 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 3607b45..335cecc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -55,10 +53,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
@@ -75,8 +70,6 @@ import static org.testng.Assert.assertTrue;
 @Slf4j
 public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String RECOVER_COMMIT = NAMESPACE1 + 
"/recover-commit";
     private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
     private static final String SUBSCRIPTION_NAME = "test-recover";
@@ -85,36 +78,9 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(RECOVER_COMMIT);
+        setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
         admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
         admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
-
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index 1f5ab15..41f98c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -18,19 +18,13 @@
  */
 package org.apache.pulsar.broker.transaction;
 
-import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
@@ -49,33 +43,12 @@ import static org.testng.FileAssert.fail;
 
 public class TransactionClientReconnectTest extends TransactionTestBase {
 
-    private static final String RECONNECT_TOPIC = 
"persistent://public/txn/txn-client-reconnect-test";
+    private static final String RECONNECT_TOPIC = NAMESPACE1 + 
"/txn-client-reconnect-test";
     private static final int NUM_PARTITIONS = 1;
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
-        setBrokerCount(1);
-        super.internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet(), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace("public/txn", 10);
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
+        setUpBase(1, NUM_PARTITIONS, RECONNECT_TOPIC, 0);
         admin.topics().createSubscription(RECONNECT_TOPIC, "test", 
MessageId.latest);
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0e63f53..cbae03b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -50,10 +49,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -69,9 +65,6 @@ import org.testng.annotations.Test;
 public class TransactionProduceTest extends TransactionTestBase {
 
     private static final int TOPIC_PARTITION = 3;
-
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String PRODUCE_COMMIT_TOPIC = NAMESPACE1 + 
"/produce-commit";
     private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + 
"/produce-abort";
     private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
@@ -79,37 +72,10 @@ public class TransactionProduceTest extends 
TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
-        admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);
-
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 
TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 
TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 
TOPIC_PARTITION);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index bb172b5..a862405 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -77,36 +77,12 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final int NUM_BROKERS = 1;
     private static final int NUM_PARTITIONS = 1;
 
     @BeforeMethod
     protected void setup() throws Exception {
-        this.setBrokerCount(NUM_BROKERS);
-        this.internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
- 1];
-        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
-                .serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-        pulsarClient.close();
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+       setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 622421b..936e43e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
 import java.util.ArrayList;
@@ -45,10 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -61,6 +62,7 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -83,6 +85,9 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
     private OrderedExecutor bkExecutor;
     private NonClosableMockBookKeeper mockBookKeeper;
 
+    public static final String TENANT = "tnx";
+    protected static final String NAMESPACE1 = TENANT + "/ns1";
+
     public void internalSetup() throws Exception {
         incrementSetupNumber();
         init();
@@ -108,6 +113,40 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
         mockBookKeeper = createMockBookKeeper(bkExecutor);
         startBroker();
     }
+    protected void setUpBase(int numBroker,int numPartitionsOfTC, String 
topic, int numPartitions) throws Exception{
+        setBrokerCount(numBroker);
+        internalSetup();
+
+        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
+        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:";
+                + webServicePort).build());
+
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 numPartitionsOfTC);
+        if (topic != null) {
+            admin.tenants().createTenant(TENANT,
+                    new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+            admin.namespaces().createNamespace(NAMESPACE1);
+            if (numPartitions == 0) {
+                admin.topics().createNonPartitionedTopic(topic);
+            } else {
+                admin.topics().createPartitionedTopic(topic, numPartitions);
+            }
+        }
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
+        pulsarClient = PulsarClient.builder()
+                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+        // wait tc init success to ready state
+        waitForCoordinatorToBeAvailable(numPartitionsOfTC);
+    }
 
     protected void startBroker() throws Exception {
         for (int i = 0; i < brokerCount; i++) {
@@ -295,20 +334,12 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
     }
     public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
         // wait tc init success to ready state
-        Awaitility.await().until(() -> {
-            Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-                    
getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
-            if (stores.size() == numOfTCPerBroker) {
-                for (TransactionCoordinatorID transactionCoordinatorID : 
stores.keySet()) {
-                    if (((MLTransactionMetadataStore) 
stores.get(transactionCoordinatorID)).getState()
-                            != TransactionMetadataStoreState.State.Ready) {
-                        return false;
-                    }
-                }
-                return true;
-            } else {
-                return false;
-            }
-        });
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    int transactionMetaStoreCount = pulsarServiceList.stream()
+                            .mapToInt(pulsarService -> 
pulsarService.getTransactionMetadataStoreService().getStores().size())
+                            .sum();
+                    Assert.assertEquals(transactionMetaStoreCount, 
numOfTCPerBroker);
+                });
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
index db9d407..873509f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java
@@ -23,17 +23,12 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
-import javax.validation.constraints.AssertTrue;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,7 +42,6 @@ import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -55,17 +49,13 @@ import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -80,35 +70,12 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionLowWaterMarkTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC = NAMESPACE1 + "/test-topic";
 
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(TOPIC);
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
+        setUpBase(1, 16, TOPIC, 0);
+
         Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
                 
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
         Awaitility.await().until(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index e43f262..ef1c761 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -54,35 +54,11 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class TransactionStablePositionTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC = NAMESPACE1 + "/test-topic";
 
     @BeforeMethod
     protected void setup() throws Exception {
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createNonPartitionedTopic(TOPIC);
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
+        setUpBase(1, 16, TOPIC, 0);
         Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient)
                 .getTcClient().getState() == 
TransactionCoordinatorClient.State.READY);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 1725305..0102786 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -20,15 +20,11 @@ package org.apache.pulsar.broker.transaction.coordinator;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -41,15 +37,7 @@ public class TransactionMetaStoreAssignmentTest extends 
TransactionTestBase {
     @Override
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
-        setBrokerCount(3);
-        super.internalSetup();
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
+        setUpBase(3, 16, null, 0);
         pulsarClient.close();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index fc952c4..bc22473 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction.pendingack;
 
-import com.google.common.collect.Sets;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -35,21 +34,11 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -58,7 +47,6 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -71,39 +59,10 @@ import static org.testng.Assert.assertTrue;
 @Test(groups = "broker")
 public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
 
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.shutdown();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-        Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
-                
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 3820ebc..97f8f51d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -62,36 +62,13 @@ import org.testng.annotations.Test;
 @Slf4j
 public class PendingAckPersistentTest extends TransactionTestBase {
 
-    private static final String PENDING_ACK_REPLAY_TOPIC = 
"persistent://public/txn/pending-ack-replay";
-
-    private static final String NAMESPACE = "public/txn";
+    private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 + 
"/pending-ack-replay";
 
     private static final int NUM_PARTITIONS = 16;
 
     @BeforeMethod
     public void setup() throws Exception {
-        setBrokerCount(1);
-        super.internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterDataImpl.builder().serviceUrl("http://localhost:"; + 
webServicePort).build());
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet(), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE, 10);
-        admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);
-
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
+        setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -312,7 +289,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         String subName = "test-delete";
 
         String topic = TopicName.get(TopicDomain.persistent.toString(),
-                NamespaceName.get(NAMESPACE), "test-delete").toString();
+                NamespaceName.get(NAMESPACE1), "test-delete").toString();
         @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
@@ -325,7 +302,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         admin.topics().deleteSubscription(topic, subName);
 
-        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
 
         TopicStats topicStats = admin.topics().getStats(topic, false);
 
@@ -341,7 +318,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         String subName2 = "test-delete";
 
         String topic = TopicName.get(TopicDomain.persistent.toString(),
-                NamespaceName.get(NAMESPACE), "test-delete").toString();
+                NamespaceName.get(NAMESPACE1), "test-delete").toString();
         @Cleanup
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                 .topic(topic)
@@ -364,7 +341,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         admin.topics().delete(topic);
 
-        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE1);
 
         
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
 subName1)));
         
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
 subName2)));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index d7cb6c9..f343742 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -23,9 +23,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -63,11 +60,8 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -87,42 +81,14 @@ import org.testng.annotations.Test;
 public class TransactionEndToEndTest extends TransactionTestBase {
 
     private static final int TOPIC_PARTITION = 3;
-
-    private static final String TENANT = "tnx";
-    private static final String NAMESPACE1 = TENANT + "/ns1";
     private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
     private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + 
"/message-ack-test";
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
-        setBrokerCount(1);
-        internalSetup();
-
-        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
-        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
-        admin.tenants().createTenant(TENANT,
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace(NAMESPACE1);
-        admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION);
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
-
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
-
-        if (pulsarClient != null) {
-            pulsarClient.close();
-        }
-        pulsarClient = PulsarClient.builder()
-                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
-
-        // wait tc init success to ready state
-        waitForCoordinatorToBeAvailable(NUM_PARTITIONS);    }
+    }
 
     @AfterMethod(alwaysRun = true)
     protected void cleanup() {

Reply via email to