This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b784a41  KUDU-2612 register txn Java sessions with client
b784a41 is described below

commit b784a41290365d7eacb10048b45ea86708b5b314
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri May 14 19:22:24 2021 -0700

    KUDU-2612 register txn Java sessions with client
    
    Prior to this patch, the assertion in AsyncKuduClient.removeSession()
    would trigger upon closing a transactional session using via
    {AsyncKuduSession,KuduSession}.close().
    
    This patch address the issue by registering the newly created
    transactional session with corresponding AsyncKuduClient.  In addition,
    a test scenario is added to catch regressions.
    
    This is a follow-up to 7432a7a8aa0c98f8d2177c25b99576b51ff33a93.
    
    Change-Id: I117abc938bca0e698854d944c3fd6f831d3f9ce0
    Reviewed-on: http://gerrit.cloudera.org:8080/17451
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 11 +++++
 .../org/apache/kudu/client/KuduTransaction.java    |  7 +--
 .../apache/kudu/client/TestKuduTransaction.java    | 51 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 35f8477..cfa03cd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2614,6 +2614,17 @@ public class AsyncKuduClient implements AutoCloseable {
     return closeAllSessions().addCallbackDeferring(new DisconnectCB());
   }
 
+  // Create a new transactional session in the context of the transaction
+  // with the specified identifier.
+  AsyncKuduSession newTransactionalSession(long txnId) {
+    checkIsClosed();
+    AsyncKuduSession session = new AsyncKuduSession(this, txnId);
+    synchronized (sessions) {
+      sessions.add(session);
+    }
+    return session;
+  }
+
   private void checkIsClosed() {
     if (closed) {
       throw new IllegalStateException("Cannot proceed, the client has already 
been closed");
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 9ea71b9..4abf71f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -241,7 +241,7 @@ public class KuduTransaction implements AutoCloseable {
     synchronized (isInFlightSync) {
       Preconditions.checkState(isInFlight);
     }
-    return new AsyncKuduSession(client, txnId);
+    return client.newTransactionalSession(txnId);
   }
 
   /**
@@ -253,10 +253,7 @@ public class KuduTransaction implements AutoCloseable {
    * @return a new {@link KuduSession} instance
    */
   public KuduSession newKuduSession() {
-    synchronized (isInFlightSync) {
-      Preconditions.checkState(isInFlight);
-    }
-    return new KuduSession(new AsyncKuduSession(client, txnId));
+    return new KuduSession(newAsyncKuduSession());
   }
 
   /**
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 0a97e8d..3f753d3 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -227,6 +227,57 @@ public class TestKuduTransaction {
   }
 
   /**
+   * Transactional sessions can be closed as regular ones.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      "--txn_manager_enabled",
+  })
+  public void testTxnSessionClose() throws Exception {
+    final String TABLE_NAME = "txn_session_close";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    // Open and close an empty transaction session.
+    {
+      KuduTransaction txn = client.newTransaction();
+      assertNotNull(txn);
+      KuduSession session = txn.newKuduSession();
+      assertNotNull(session);
+      assertFalse(session.isClosed());
+      session.close();
+      assertTrue(session.isClosed());
+    }
+
+    // Open new transaction, insert one row for a session, close the session
+    // and then rollback the transaction. No rows should be persisted.
+    {
+      KuduTransaction txn = client.newTransaction();
+      assertNotNull(txn);
+      KuduSession session = txn.newKuduSession();
+      assertNotNull(session);
+      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+      Insert insert = createBasicSchemaInsert(table, 1);
+      session.apply(insert);
+      session.close();
+
+      txn.rollback();
+
+      assertTrue(session.isClosed());
+      assertEquals(0, session.countPendingErrors());
+
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+          .build();
+      assertEquals(0, countRowsInScan(scanner));
+    }
+  }
+
+  /**
    * Test scenario that starts a new transaction, initiates its commit phase,
    * and checks whether the commit is complete using the
    * KuduTransaction.isCommitComplete() method.

Reply via email to