This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new cb703ca Optimize Tests (#12560) cb703ca is described below commit cb703cab9b58bc7fe2da5694e2404538c44dab7a Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Mon Nov 1 23:36:38 2021 +0800 Optimize Tests (#12560) --- .../TopicTransactionBufferRecoverTest.java | 36 +---------- .../TransactionClientReconnectTest.java | 31 +--------- .../broker/transaction/TransactionProduceTest.java | 42 ++----------- .../pulsar/broker/transaction/TransactionTest.java | 26 +------- .../broker/transaction/TransactionTestBase.java | 69 ++++++++++++++++------ .../buffer/TransactionLowWaterMarkTest.java | 39 +----------- .../buffer/TransactionStablePositionTest.java | 26 +------- .../TransactionMetaStoreAssignmentTest.java | 14 +---- .../pendingack/PendingAckInMemoryDeleteTest.java | 43 +------------- .../pendingack/PendingAckPersistentTest.java | 35 ++--------- .../client/impl/TransactionEndToEndTest.java | 38 +----------- 11 files changed, 72 insertions(+), 327 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 3607b45..335cecc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.transaction; -import com.google.common.collect.Sets; - import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -55,10 +53,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; @@ -75,8 +70,6 @@ import static org.testng.Assert.assertTrue; @Slf4j public class TopicTransactionBufferRecoverTest extends TransactionTestBase { - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final String RECOVER_COMMIT = NAMESPACE1 + "/recover-commit"; private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort"; private static final String SUBSCRIPTION_NAME = "test-recover"; @@ -85,36 +78,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { - setBrokerCount(1); - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.topics().createNonPartitionedTopic(RECOVER_COMMIT); + setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0); admin.topics().createNonPartitionedTopic(RECOVER_ABORT); admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT); - - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - - if (pulsarClient != null) { - pulsarClient.shutdown(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - - - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java index 1f5ab15..41f98c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java @@ -18,19 +18,13 @@ */ package org.apache.pulsar.broker.transaction; -import com.google.common.collect.Sets; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; @@ -49,33 +43,12 @@ import static org.testng.FileAssert.fail; public class TransactionClientReconnectTest extends TransactionTestBase { - private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test"; + private static final String RECONNECT_TOPIC = NAMESPACE1 + "/txn-client-reconnect-test"; private static final int NUM_PARTITIONS = 1; @BeforeMethod(alwaysRun = true) public void setup() throws Exception { - setBrokerCount(1); - super.internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace("public/txn", 10); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC); + setUpBase(1, NUM_PARTITIONS, RECONNECT_TOPIC, 0); admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 0e63f53..cbae03b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -50,10 +49,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MarkerType; import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; import org.testng.Assert; @@ -69,9 +65,6 @@ import org.testng.annotations.Test; public class TransactionProduceTest extends TransactionTestBase { private static final int TOPIC_PARTITION = 3; - - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final String PRODUCE_COMMIT_TOPIC = NAMESPACE1 + "/produce-commit"; private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort"; private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit"; @@ -79,37 +72,10 @@ public class TransactionProduceTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { - setBrokerCount(1); - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3); - admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3); - admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3); - admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3); - - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - - if (pulsarClient != null) { - pulsarClient.shutdown(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - - - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); + setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION); + admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, TOPIC_PARTITION); + admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, TOPIC_PARTITION); + admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, TOPIC_PARTITION); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index bb172b5..a862405 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -77,36 +77,12 @@ import org.testng.annotations.Test; @Test(groups = "broker") public class TransactionTest extends TransactionTestBase { - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final int NUM_BROKERS = 1; private static final int NUM_PARTITIONS = 1; @BeforeMethod protected void setup() throws Exception { - this.setBrokerCount(NUM_BROKERS); - this.internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder() - .serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - pulsarClient.close(); - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); + setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 622421b..936e43e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import java.util.ArrayList; @@ -45,10 +46,10 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor; import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -61,6 +62,7 @@ import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; import org.apache.zookeeper.ZooKeeper; import org.awaitility.Awaitility; +import org.testng.Assert; @Slf4j public abstract class TransactionTestBase extends TestRetrySupport { @@ -83,6 +85,9 @@ public abstract class TransactionTestBase extends TestRetrySupport { private OrderedExecutor bkExecutor; private NonClosableMockBookKeeper mockBookKeeper; + public static final String TENANT = "tnx"; + protected static final String NAMESPACE1 = TENANT + "/ns1"; + public void internalSetup() throws Exception { incrementSetupNumber(); init(); @@ -108,6 +113,40 @@ public abstract class TransactionTestBase extends TestRetrySupport { mockBookKeeper = createMockBookKeeper(bkExecutor); startBroker(); } + protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int numPartitions) throws Exception{ + setBrokerCount(numBroker); + internalSetup(); + + String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); + String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + + webServicePort).build()); + + admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), numPartitionsOfTC); + if (topic != null) { + admin.tenants().createTenant(TENANT, + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NAMESPACE1); + if (numPartitions == 0) { + admin.topics().createNonPartitionedTopic(topic); + } else { + admin.topics().createPartitionedTopic(topic, numPartitions); + } + } + if (pulsarClient != null) { + pulsarClient.shutdown(); + } + pulsarClient = PulsarClient.builder() + .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) + .statsInterval(0, TimeUnit.SECONDS) + .enableTransaction(true) + .build(); + // wait tc init success to ready state + waitForCoordinatorToBeAvailable(numPartitionsOfTC); + } protected void startBroker() throws Exception { for (int i = 0; i < brokerCount; i++) { @@ -295,20 +334,12 @@ public abstract class TransactionTestBase extends TestRetrySupport { } public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){ // wait tc init success to ready state - Awaitility.await().until(() -> { - Map<TransactionCoordinatorID, TransactionMetadataStore> stores = - getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores(); - if (stores.size() == numOfTCPerBroker) { - for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) { - if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState() - != TransactionMetadataStoreState.State.Ready) { - return false; - } - } - return true; - } else { - return false; - } - }); + Awaitility.await() + .untilAsserted(() -> { + int transactionMetaStoreCount = pulsarServiceList.stream() + .mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size()) + .sum(); + Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker); + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index db9d407..873509f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -23,17 +23,12 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - -import com.google.common.collect.Sets; - import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - -import javax.validation.constraints.AssertTrue; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -47,7 +42,6 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; @@ -55,17 +49,13 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; + import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; -import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; @@ -80,35 +70,12 @@ import org.testng.annotations.Test; @Test(groups = "broker") public class TransactionLowWaterMarkTest extends TransactionTestBase { - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final String TOPIC = NAMESPACE1 + "/test-topic"; @BeforeMethod(alwaysRun = true) protected void setup() throws Exception { - setBrokerCount(1); - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.topics().createNonPartitionedTopic(TOPIC); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); - - if (pulsarClient != null) { - pulsarClient.shutdown(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); + setUpBase(1, 16, TOPIC, 0); + Map<TransactionCoordinatorID, TransactionMetadataStore> stores = getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores(); Awaitility.await().until(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java index e43f262..ef1c761 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java @@ -54,35 +54,11 @@ import org.testng.annotations.Test; @Test(groups = "broker") public class TransactionStablePositionTest extends TransactionTestBase { - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final String TOPIC = NAMESPACE1 + "/test-topic"; @BeforeMethod protected void setup() throws Exception { - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.topics().createNonPartitionedTopic(TOPIC); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); - - if (pulsarClient != null) { - pulsarClient.shutdown(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - + setUpBase(1, 16, TOPIC, 0); Awaitility.await().until(() -> ((PulsarClientImpl) pulsarClient) .getTcClient().getState() == TransactionCoordinatorClient.State.READY); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java index 1725305..0102786 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java @@ -20,15 +20,11 @@ package org.apache.pulsar.broker.transaction.coordinator; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Sets; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.awaitility.Awaitility; import org.testng.Assert; @@ -41,15 +37,7 @@ public class TransactionMetaStoreAssignmentTest extends TransactionTestBase { @Override @BeforeMethod(alwaysRun = true) protected void setup() throws Exception { - setBrokerCount(3); - super.internalSetup(); - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); + setUpBase(3, 16, null, 0); pulsarClient.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index fc952c4..bc22473 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.transaction.pendingack; -import com.google.common.collect.Sets; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -35,21 +34,11 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -58,7 +47,6 @@ import org.testng.annotations.Test; import java.lang.reflect.Field; import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; @@ -71,39 +59,10 @@ import static org.testng.Assert.assertTrue; @Test(groups = "broker") public class PendingAckInMemoryDeleteTest extends TransactionTestBase { - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { - setBrokerCount(1); - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - - if (pulsarClient != null) { - pulsarClient.shutdown(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - - Map<TransactionCoordinatorID, TransactionMetadataStore> stores = - getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores(); - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); + setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 3820ebc..97f8f51d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -62,36 +62,13 @@ import org.testng.annotations.Test; @Slf4j public class PendingAckPersistentTest extends TransactionTestBase { - private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay"; - - private static final String NAMESPACE = "public/txn"; + private static final String PENDING_ACK_REPLAY_TOPIC = NAMESPACE1 + "/pending-ack-replay"; private static final int NUM_PARTITIONS = 16; @BeforeMethod public void setup() throws Exception { - setBrokerCount(1); - super.internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterDataImpl.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE, 10); - admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC); - - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); + setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0); } @AfterMethod(alwaysRun = true) @@ -312,7 +289,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { String subName = "test-delete"; String topic = TopicName.get(TopicDomain.persistent.toString(), - NamespaceName.get(NAMESPACE), "test-delete").toString(); + NamespaceName.get(NAMESPACE1), "test-delete").toString(); @Cleanup Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) @@ -325,7 +302,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { admin.topics().deleteSubscription(topic, subName); - List<String> topics = admin.namespaces().getTopics(NAMESPACE); + List<String> topics = admin.namespaces().getTopics(NAMESPACE1); TopicStats topicStats = admin.topics().getStats(topic, false); @@ -341,7 +318,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { String subName2 = "test-delete"; String topic = TopicName.get(TopicDomain.persistent.toString(), - NamespaceName.get(NAMESPACE), "test-delete").toString(); + NamespaceName.get(NAMESPACE1), "test-delete").toString(); @Cleanup Consumer<byte[]> consumer1 = pulsarClient.newConsumer() .topic(topic) @@ -364,7 +341,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { admin.topics().delete(topic); - List<String> topics = admin.namespaces().getTopics(NAMESPACE); + List<String> topics = admin.namespaces().getTopics(NAMESPACE1); assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName1))); assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index d7cb6c9..f343742 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -23,9 +23,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - -import com.google.common.collect.Sets; - import java.lang.reflect.Field; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -63,11 +60,8 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.api.proto.CommandAck; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -87,42 +81,14 @@ import org.testng.annotations.Test; public class TransactionEndToEndTest extends TransactionTestBase { private static final int TOPIC_PARTITION = 3; - - private static final String TENANT = "tnx"; - private static final String NAMESPACE1 = TENANT + "/ns1"; private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { - setBrokerCount(1); - internalSetup(); - - String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); - String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; - admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); - admin.tenants().createTenant(TENANT, - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NAMESPACE1); - admin.topics().createPartitionedTopic(TOPIC_OUTPUT, TOPIC_PARTITION); + setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); - - admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); - admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - - if (pulsarClient != null) { - pulsarClient.close(); - } - pulsarClient = PulsarClient.builder() - .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .enableTransaction(true) - .build(); - - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(NUM_PARTITIONS); } + } @AfterMethod(alwaysRun = true) protected void cleanup() {