This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1977a84  Add getTxnID method in Transaction.java (#11438)
1977a84 is described below

commit 1977a84179cda1c830617025f5f785563c068088
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Tue Jul 27 18:04:14 2021 +0800

    Add getTxnID method in Transaction.java (#11438)
---
 .../pulsar/broker/transaction/TransactionTest.java | 34 ++++++++++++++++++++--
 .../broker/transaction/TransactionTestBase.java    | 28 +++++++++++++-----
 .../pulsar/client/api/transaction/Transaction.java |  5 ++++
 .../client/impl/transaction/TransactionImpl.java   |  5 ++++
 4 files changed, 62 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ce8b3ae..26a0260 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.transaction;
 
 import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
 import com.google.common.collect.Sets;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -32,12 +34,18 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -53,15 +61,18 @@ public class TransactionTest extends TransactionTestBase {
 
     private static final String TENANT = "tnx";
     private static final String NAMESPACE1 = TENANT + "/ns1";
+    private static final int NUM_BROKERS = 1;
+    private static final int NUM_PARTITIONS = 1;
 
     @BeforeMethod
     protected void setup() throws Exception {
-        this.setBrokerCount(1);
+        this.setBrokerCount(NUM_BROKERS);
         this.internalSetup();
 
         String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
         String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
- 1];
-        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
+        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
+                .serviceUrl("http://localhost:"; + webServicePort).build());
         admin.tenants().createTenant(TENANT,
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NAMESPACE1);
@@ -69,7 +80,7 @@ public class TransactionTest extends TransactionTestBase {
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
-        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 NUM_PARTITIONS);
         pulsarClient.close();
         pulsarClient = PulsarClient.builder()
                 
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
@@ -139,4 +150,21 @@ public class TransactionTest extends TransactionTestBase {
                 .enableBatchIndexAcknowledgment(true)
                 .subscribe();
     }
+
+    @Test
+    public void testGetTxnID() throws Exception {
+        // wait tc init success to ready state
+        Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, 
NUM_PARTITIONS));
+        Transaction transaction = pulsarClient.newTransaction()
+                .build().get();
+        TxnID txnID = transaction.getTxnID();
+        Assert.assertEquals(txnID.getLeastSigBits(), 0);
+        Assert.assertEquals(txnID.getMostSigBits(), 0);
+        transaction.abort();
+        transaction = pulsarClient.newTransaction()
+                .build().get();
+        txnID = transaction.getTxnID();
+        Assert.assertEquals(txnID.getLeastSigBits(), 1);
+        Assert.assertEquals(txnID.getMostSigBits(), 0);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index a2091f4..651b2dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -21,19 +21,17 @@ package org.apache.pulsar.broker.transaction;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.channel.EventLoopGroup;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import io.netty.channel.EventLoopGroup;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -49,17 +47,22 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
-import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 
 @Slf4j
 public abstract class TransactionTestBase extends TestRetrySupport {
@@ -291,5 +294,16 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
             log.warn("Failed to clean up mocked pulsar service:", e);
         }
     }
-
+    public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int 
numOfTCPerBroker){
+        // wait tc init success to ready state
+        Awaitility.await().untilAsserted(() -> {
+            TransactionMetadataStore transactionMetadataStore =
+                    getPulsarServiceList().get(numOfBroker - 
1).getTransactionMetadataStoreService()
+                            
.getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1));
+            assertNotNull(transactionMetadataStore);
+            assertEquals(((MLTransactionMetadataStore) 
transactionMetadataStore).getState(),
+                    TransactionMetadataStoreState.State.Ready);
+        });
+        return true;
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index af2df9e..fd4cf0b 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -43,4 +43,9 @@ public interface Transaction {
      */
     CompletableFuture<Void> abort();
 
+    /**
+     * Get TxnID of the transaction.
+     *  @return {@link TxnID} the txnID.
+     */
+    TxnID getTxnID();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 1b6655e..60c7829 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -208,6 +208,11 @@ public class TransactionImpl implements Transaction {
         });
     }
 
+    @Override
+    public TxnID getTxnID() {
+        return new TxnID(txnIdMostBits, txnIdLeastBits);
+    }
+
     private CompletableFuture<Void> checkIfOpen() {
         if (state == State.OPEN) {
             return CompletableFuture.completedFuture(null);

Reply via email to