This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ac91eb51009636a0b68dbc7271f63b62ab6f3350
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Mon May 3 16:46:47 2021 -0700

    KUDU-2612: update Java client API to commit a transaction
    
    This patch updates the signature of the KuduTransaction::commit()
    method to address recent feedback on the txn-related API.  The idea is
    to make the API easier to use, since txn.commit(false) looks a bit vague
    and might be confusing as well.
    
    In essence, the 'wait' parameter is gone and now there are two methods:
      * KuduTransaction.commit()
      * KuduTransaction.startCommit()
    
    The former starts committing a multi-row transaction and waits for the
    commit phase to finalize.  The latter just starts the commit phase for
    a multi-row transaction and returns, not awaiting for the commit phase
    to finalize.
    
    These changes mirror recent changes in the C++ client API.
    
    Change-Id: Ia8c48b4d375945649c48428401f09ec5c7cc8cb7
    Reviewed-on: http://gerrit.cloudera.org:8080/17393
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <granthe...@apache.org>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/KuduTransaction.java    | 99 ++++++++++++++--------
 .../apache/kudu/client/TestKuduTransaction.java    | 46 +++++-----
 2 files changed, 87 insertions(+), 58 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 042ec69..9ea71b9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -147,6 +147,8 @@ public class KuduTransaction implements AutoCloseable {
       LoggerFactory.getLogger(KuduTransaction.class);
   private static final SerializationOptions defaultSerializationOptions =
       new SerializationOptions();
+  private static final String ERRMSG_TXN_NOT_OPEN =
+      "transaction is not open for this handle";
 
   private final AsyncKuduClient client;
   private long txnId = AsyncKuduClient.INVALID_TXN_ID;
@@ -259,41 +261,28 @@ public class KuduTransaction implements AutoCloseable {
 
   /**
    * Commit the multi-row distributed transaction represented by this handle.
+   * <p>
+   * This method starts committing the transaction and awaits for the commit
+   * phase to finalize.
    *
-   * @param wait whether to wait for the transaction's commit phase to 
finalize.
-   *             If {@code true}, this method blocks until the commit is
-   *             finalized, otherwise it starts committing the transaction and
-   *             returns. In the latter case, it's possible to check for the
-   *             transaction status using the
-   *             {@link KuduTransaction#isCommitComplete} method.
    * @throws KuduException if something went wrong
    */
-  public void commit(boolean wait) throws KuduException {
-    Preconditions.checkState(isInFlight, "transaction is not open for this 
handle");
-    CommitTransactionRequest req = doCommitTransaction();
-    // Now, there is no need to continue sending keepalive messages: the
-    // transaction should be in COMMIT_IN_PROGRESS state after successful
-    // completion of the calls above, and the backend takes care of everything
-    // else: nothing is required from the client side to successfully complete
-    // the commit phase of the transaction past this point.
-    synchronized (keepaliveTaskHandleSync) {
-      if (keepaliveTaskHandle != null) {
-        LOG.debug("stopping keepalive heartbeating after commit (txn ID {})", 
txnId);
-        keepaliveTaskHandle.cancel();
-      }
-    }
-
-    if (wait) {
-      Deferred<GetTransactionStateResponse> txnState =
-          getDelayedIsTransactionCommittedDeferred(req);
-      KuduClient.joinAndHandleException(txnState);
-    }
+  public void commit() throws KuduException {
+    commitWithMode(CommitMode.WAIT_FOR_COMPLETION);
+  }
 
-    // Once everything else is completed successfully, mark the transaction as
-    // no longer in flight.
-    synchronized (isInFlightSync) {
-      isInFlight = false;
-    }
+  /**
+   * Start committing the multi-row distributed transaction represented by
+   * this handle.
+   * <p>
+   * This method only starts committing the transaction, not awaiting for the
+   * commit phase to finalize. Use {@link KuduTransaction#isCommitComplete()}
+   * to check whether the transaction is committed.
+   *
+   * @throws KuduException if something went wrong upon starting to commit
+   */
+  public void startCommit() throws KuduException {
+    commitWithMode(CommitMode.START_ONLY);
   }
 
   /**
@@ -346,7 +335,7 @@ public class KuduTransaction implements AutoCloseable {
    * @throws KuduException if something went wrong
    */
   public void rollback() throws KuduException {
-    Preconditions.checkState(isInFlight, "transaction is not open for this 
handle");
+    Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN);
     doRollbackTransaction();
     // Now, there is no need to continue sending keepalive messages.
     synchronized (keepaliveTaskHandleSync) {
@@ -490,6 +479,45 @@ public class KuduTransaction implements AutoCloseable {
     return request;
   }
 
+  /**
+   * Transaction commit mode.
+   */
+  private enum CommitMode {
+    /** Only start/initiate the commit phase, don't wait for the completion. */
+    START_ONLY,
+
+    /** Start the commit phase and wait until it succeeds or fails. */
+    WAIT_FOR_COMPLETION,
+  }
+
+  private void commitWithMode(CommitMode mode) throws KuduException {
+    Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN);
+    CommitTransactionRequest req = doCommitTransaction();
+    // Now, there is no need to continue sending keepalive messages: the
+    // transaction should be in COMMIT_IN_PROGRESS state after successful
+    // completion of the calls above, and the backend takes care of everything
+    // else: nothing is required from the client side to successfully complete
+    // the commit phase of the transaction past this point.
+    synchronized (keepaliveTaskHandleSync) {
+      if (keepaliveTaskHandle != null) {
+        LOG.debug("stopping keepalive heartbeating after initiating commit 
(txn ID {})", txnId);
+        keepaliveTaskHandle.cancel();
+      }
+    }
+
+    if (mode == CommitMode.WAIT_FOR_COMPLETION) {
+      Deferred<GetTransactionStateResponse> txnState =
+          getDelayedIsTransactionCommittedDeferred(req);
+      KuduClient.joinAndHandleException(txnState);
+    }
+
+    // Once everything else is completed successfully, mark the transaction as
+    // no longer in flight.
+    synchronized (isInFlightSync) {
+      isInFlight = false;
+    }
+  }
+
   private Deferred<GetTransactionStateResponse> isTransactionCommittedAsync() {
     GetTransactionStateRequest request = new GetTransactionStateRequest(
         client.getMasterTable(),
@@ -664,8 +692,8 @@ public class KuduTransaction implements AutoCloseable {
   }
 
   /**
-   * Schedule a timer to send a KeepTransactiveAlive RPC to TxnManager after
-   * @c sleepTimeMillis milliseconds.
+   * Schedule a timer to send a KeepTransactionAlive RPC to TxnManager after
+   * sleepTimeMillis milliseconds.
    *
    * @param runAfterMillis time delta from now when to run the task
    * @param callback callback to call on successfully sent RPC
@@ -683,7 +711,8 @@ public class KuduTransaction implements AutoCloseable {
       }
     }
 
-    return client.newTimeout(client.getTimer(), new RetryTimer(), 
runAfterMillis);
+    return AsyncKuduClient.newTimeout(
+        client.getTimer(), new RetryTimer(), runAfterMillis);
   }
 
   private Callback<Void, KeepTransactionAliveResponse> 
getSendKeepTransactionAliveCB() {
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index c8f9f2c..440e82d 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -166,14 +166,14 @@ public class TestKuduTransaction {
   })
   public void testCommitAnEmptyTransaction() throws Exception {
     KuduTransaction txn = client.newTransaction();
-    txn.commit(false);
+    txn.startCommit();
     // A duplicate call to commit the transaction using the same handle
     // should fail.
     IllegalStateException ex = assertThrows(
         IllegalStateException.class, new ThrowingRunnable() {
           @Override
           public void run() throws Throwable {
-            txn.commit(false);
+            txn.startCommit();
           }
         });
     assertEquals("transaction is not open for this handle", ex.getMessage());
@@ -183,7 +183,7 @@ public class TestKuduTransaction {
     // since committing a transaction has idempotent semantics for the 
back-end.
     byte[] buf = txn.serialize();
     KuduTransaction serdesTxn = KuduTransaction.deserialize(buf, asyncClient);
-    serdesTxn.commit(false);
+    serdesTxn.startCommit();
   }
 
   /**
@@ -200,7 +200,7 @@ public class TestKuduTransaction {
     try {
       // Try to commit the transaction in non-synchronous mode, i.e. just
       // initiate committing the transaction.
-      fakeTxn.commit(false);
+      fakeTxn.startCommit();
       fail("committing a non-existing transaction should have failed");
     } catch (NonRecoverableException e) {
       final String errmsg = e.getMessage();
@@ -214,7 +214,7 @@ public class TestKuduTransaction {
     try {
       // Try to commit the transaction in synchronous mode, i.e. initiate
       // committing the transaction and wait for the commit phase to finalize.
-      fakeTxn.commit(true);
+      fakeTxn.commit();
       fail("committing a non-existing transaction should have failed");
     } catch (NonRecoverableException e) {
       final String errmsg = e.getMessage();
@@ -241,7 +241,7 @@ public class TestKuduTransaction {
   })
   public void testIsCommitComplete() throws Exception {
     KuduTransaction txn = client.newTransaction();
-    txn.commit(false);
+    txn.startCommit();
     assertFalse(txn.isCommitComplete());
   }
 
@@ -315,7 +315,7 @@ public class TestKuduTransaction {
   })
   public void testCommitAnEmptyTransactionWait() throws Exception {
     KuduTransaction txn = client.newTransaction();
-    txn.commit(true);
+    txn.commit();
     assertTrue(txn.isCommitComplete());
   }
 
@@ -467,7 +467,7 @@ public class TestKuduTransaction {
     try (KuduTransaction txn = client.newTransaction()) {
       buf = txn.serialize();
       assertNotNull(buf);
-      txn.commit(false);
+      txn.startCommit();
       txn.isCommitComplete();
     } catch (Exception e) {
       fail("unexpected exception: " + e.toString());
@@ -587,7 +587,7 @@ public class TestKuduTransaction {
       // It should be possible to commit the transaction since it supposed to 
be
       // open at this point even after multiples of the inactivity timeout
       // interval.
-      txn.commit(false);
+      txn.startCommit();
     } catch (Exception e) {
       fail("unexpected exception: " + e.toString());
     }
@@ -614,7 +614,7 @@ public class TestKuduTransaction {
           NonRecoverableException.class, new ThrowingRunnable() {
             @Override
             public void run() throws Throwable {
-              txn.commit(false);
+              txn.startCommit();
             }
           });
       final String errmsg = ex.getMessage();
@@ -674,7 +674,7 @@ public class TestKuduTransaction {
           NonRecoverableException.class, new ThrowingRunnable() {
             @Override
             public void run() throws Throwable {
-              serdesTxn.commit(false);
+              serdesTxn.startCommit();
             }
           });
       final String errmsg = ex.getMessage();
@@ -715,11 +715,11 @@ public class TestKuduTransaction {
       // At this point, the underlying transaction should be kept open
       // because the 'serdesTxn' handle sends keepalive heartbeats even if the
       // original handle ceased to send those after calling 'close()' on it.
-      // As an extra sanity check, call 'commit()' and 'isCommitComplete()'
+      // As an extra sanity check, call 'startCommit()' and 
'isCommitComplete()'
       // on both handles to make sure no exception is thrown.
-      serdesTxn.commit(false);
+      serdesTxn.startCommit();
       serdesTxn.isCommitComplete();
-      txn.commit(false);
+      txn.startCommit();
       txn.isCommitComplete();
     }
   }
@@ -727,7 +727,7 @@ public class TestKuduTransaction {
   /**
    * This scenario validates the propagation of the commit timestamp for a
    * multi-row transaction when committing the transaction synchronously via
-   * {@link KuduTransaction#commit(boolean wait = true)} or calling
+   * {@link KuduTransaction#commit()} or calling
    * {@link KuduTransaction#isCommitComplete()} once the transaction's commit
    * has started to run asynchronously.
    */
@@ -769,7 +769,7 @@ public class TestKuduTransaction {
       assertEquals(0, session.countPendingErrors());
 
       final long tsBeforeCommit = client.getLastPropagatedTimestamp();
-      txn.commit(true /*wait*/);
+      txn.commit();
       final long tsAfterCommit = client.getLastPropagatedTimestamp();
       assertTrue(tsAfterCommit > tsBeforeCommit);
     }
@@ -792,7 +792,7 @@ public class TestKuduTransaction {
       assertEquals(0, session.countPendingErrors());
 
       final long tsBeforeCommit = client.getLastPropagatedTimestamp();
-      txn.commit(false /*wait*/);
+      txn.startCommit();
       assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp());
 
       assertEventuallyTrue("commit should eventually finalize",
@@ -821,7 +821,7 @@ public class TestKuduTransaction {
     {
       KuduTransaction txn = client.newTransaction();
       final long tsBeforeCommit = client.getLastPropagatedTimestamp();
-      txn.commit(true /*wait*/);
+      txn.commit();
 
       // Just in case, linger a bit after commit has been finalized, checking
       // for the timestamp propagated to the client side.
@@ -882,7 +882,7 @@ public class TestKuduTransaction {
       harness.startAllMasterServers();
 
       // It should be possible to commit the transaction.
-      txn.commit(true /*wait*/);
+      txn.commit();
 
       // An extra sanity check: read back the rows written into the table in 
the
       // context of the transaction.
@@ -912,7 +912,7 @@ public class TestKuduTransaction {
       // It should be possible to commit the transaction: 2 out of 3 masters 
are
       // running and Raft should be able to establish a leader master. So,
       // txn-related operations routed through TxnManager should succeed.
-      txn.commit(true /*wait*/);
+      txn.commit();
 
       // An extra sanity check: read back the rows written into the table in 
the
       // context of the transaction.
@@ -975,7 +975,7 @@ public class TestKuduTransaction {
     t.start();
 
     // It should be possible to commit the transaction.
-    txn.commit(true /*wait*/);
+    txn.commit();
 
     // Just an extra sanity check: the thread should join pretty fast, 
otherwise
     // the call to KuduTransaction.commit() above could not succeed.
@@ -1048,7 +1048,7 @@ public class TestKuduTransaction {
     //
     //   * the client switches to the new TxnManager for other txn-related
     //     operations as well
-    txn.commit(true /*wait*/);
+    txn.commit();
 
     // An extra sanity check: read back the rows written into the table in the
     // context of the transaction.
@@ -1146,7 +1146,7 @@ public class TestKuduTransaction {
 
     // The transaction should be still alive, and it should be possible to
     // commit it.
-    txn.commit(true /*wait*/);
+    txn.commit();
 
     t.join();
 

Reply via email to