This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ac91eb51009636a0b68dbc7271f63b62ab6f3350 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Mon May 3 16:46:47 2021 -0700 KUDU-2612: update Java client API to commit a transaction This patch updates the signature of the KuduTransaction::commit() method to address recent feedback on the txn-related API. The idea is to make the API easier to use, since txn.commit(false) looks a bit vague and might be confusing as well. In essence, the 'wait' parameter is gone and now there are two methods: * KuduTransaction.commit() * KuduTransaction.startCommit() The former starts committing a multi-row transaction and waits for the commit phase to finalize. The latter just starts the commit phase for a multi-row transaction and returns, not awaiting for the commit phase to finalize. These changes mirror recent changes in the C++ client API. Change-Id: Ia8c48b4d375945649c48428401f09ec5c7cc8cb7 Reviewed-on: http://gerrit.cloudera.org:8080/17393 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <granthe...@apache.org> Reviewed-by: Andrew Wong <aw...@cloudera.com> --- .../org/apache/kudu/client/KuduTransaction.java | 99 ++++++++++++++-------- .../apache/kudu/client/TestKuduTransaction.java | 46 +++++----- 2 files changed, 87 insertions(+), 58 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java index 042ec69..9ea71b9 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java @@ -147,6 +147,8 @@ public class KuduTransaction implements AutoCloseable { LoggerFactory.getLogger(KuduTransaction.class); private static final SerializationOptions defaultSerializationOptions = new SerializationOptions(); + private static final String ERRMSG_TXN_NOT_OPEN = + "transaction is not open for this handle"; private final AsyncKuduClient client; private long txnId = AsyncKuduClient.INVALID_TXN_ID; @@ -259,41 +261,28 @@ public class KuduTransaction implements AutoCloseable { /** * Commit the multi-row distributed transaction represented by this handle. + * <p> + * This method starts committing the transaction and awaits for the commit + * phase to finalize. * - * @param wait whether to wait for the transaction's commit phase to finalize. - * If {@code true}, this method blocks until the commit is - * finalized, otherwise it starts committing the transaction and - * returns. In the latter case, it's possible to check for the - * transaction status using the - * {@link KuduTransaction#isCommitComplete} method. * @throws KuduException if something went wrong */ - public void commit(boolean wait) throws KuduException { - Preconditions.checkState(isInFlight, "transaction is not open for this handle"); - CommitTransactionRequest req = doCommitTransaction(); - // Now, there is no need to continue sending keepalive messages: the - // transaction should be in COMMIT_IN_PROGRESS state after successful - // completion of the calls above, and the backend takes care of everything - // else: nothing is required from the client side to successfully complete - // the commit phase of the transaction past this point. - synchronized (keepaliveTaskHandleSync) { - if (keepaliveTaskHandle != null) { - LOG.debug("stopping keepalive heartbeating after commit (txn ID {})", txnId); - keepaliveTaskHandle.cancel(); - } - } - - if (wait) { - Deferred<GetTransactionStateResponse> txnState = - getDelayedIsTransactionCommittedDeferred(req); - KuduClient.joinAndHandleException(txnState); - } + public void commit() throws KuduException { + commitWithMode(CommitMode.WAIT_FOR_COMPLETION); + } - // Once everything else is completed successfully, mark the transaction as - // no longer in flight. - synchronized (isInFlightSync) { - isInFlight = false; - } + /** + * Start committing the multi-row distributed transaction represented by + * this handle. + * <p> + * This method only starts committing the transaction, not awaiting for the + * commit phase to finalize. Use {@link KuduTransaction#isCommitComplete()} + * to check whether the transaction is committed. + * + * @throws KuduException if something went wrong upon starting to commit + */ + public void startCommit() throws KuduException { + commitWithMode(CommitMode.START_ONLY); } /** @@ -346,7 +335,7 @@ public class KuduTransaction implements AutoCloseable { * @throws KuduException if something went wrong */ public void rollback() throws KuduException { - Preconditions.checkState(isInFlight, "transaction is not open for this handle"); + Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN); doRollbackTransaction(); // Now, there is no need to continue sending keepalive messages. synchronized (keepaliveTaskHandleSync) { @@ -490,6 +479,45 @@ public class KuduTransaction implements AutoCloseable { return request; } + /** + * Transaction commit mode. + */ + private enum CommitMode { + /** Only start/initiate the commit phase, don't wait for the completion. */ + START_ONLY, + + /** Start the commit phase and wait until it succeeds or fails. */ + WAIT_FOR_COMPLETION, + } + + private void commitWithMode(CommitMode mode) throws KuduException { + Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN); + CommitTransactionRequest req = doCommitTransaction(); + // Now, there is no need to continue sending keepalive messages: the + // transaction should be in COMMIT_IN_PROGRESS state after successful + // completion of the calls above, and the backend takes care of everything + // else: nothing is required from the client side to successfully complete + // the commit phase of the transaction past this point. + synchronized (keepaliveTaskHandleSync) { + if (keepaliveTaskHandle != null) { + LOG.debug("stopping keepalive heartbeating after initiating commit (txn ID {})", txnId); + keepaliveTaskHandle.cancel(); + } + } + + if (mode == CommitMode.WAIT_FOR_COMPLETION) { + Deferred<GetTransactionStateResponse> txnState = + getDelayedIsTransactionCommittedDeferred(req); + KuduClient.joinAndHandleException(txnState); + } + + // Once everything else is completed successfully, mark the transaction as + // no longer in flight. + synchronized (isInFlightSync) { + isInFlight = false; + } + } + private Deferred<GetTransactionStateResponse> isTransactionCommittedAsync() { GetTransactionStateRequest request = new GetTransactionStateRequest( client.getMasterTable(), @@ -664,8 +692,8 @@ public class KuduTransaction implements AutoCloseable { } /** - * Schedule a timer to send a KeepTransactiveAlive RPC to TxnManager after - * @c sleepTimeMillis milliseconds. + * Schedule a timer to send a KeepTransactionAlive RPC to TxnManager after + * sleepTimeMillis milliseconds. * * @param runAfterMillis time delta from now when to run the task * @param callback callback to call on successfully sent RPC @@ -683,7 +711,8 @@ public class KuduTransaction implements AutoCloseable { } } - return client.newTimeout(client.getTimer(), new RetryTimer(), runAfterMillis); + return AsyncKuduClient.newTimeout( + client.getTimer(), new RetryTimer(), runAfterMillis); } private Callback<Void, KeepTransactionAliveResponse> getSendKeepTransactionAliveCB() { diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java index c8f9f2c..440e82d 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java @@ -166,14 +166,14 @@ public class TestKuduTransaction { }) public void testCommitAnEmptyTransaction() throws Exception { KuduTransaction txn = client.newTransaction(); - txn.commit(false); + txn.startCommit(); // A duplicate call to commit the transaction using the same handle // should fail. IllegalStateException ex = assertThrows( IllegalStateException.class, new ThrowingRunnable() { @Override public void run() throws Throwable { - txn.commit(false); + txn.startCommit(); } }); assertEquals("transaction is not open for this handle", ex.getMessage()); @@ -183,7 +183,7 @@ public class TestKuduTransaction { // since committing a transaction has idempotent semantics for the back-end. byte[] buf = txn.serialize(); KuduTransaction serdesTxn = KuduTransaction.deserialize(buf, asyncClient); - serdesTxn.commit(false); + serdesTxn.startCommit(); } /** @@ -200,7 +200,7 @@ public class TestKuduTransaction { try { // Try to commit the transaction in non-synchronous mode, i.e. just // initiate committing the transaction. - fakeTxn.commit(false); + fakeTxn.startCommit(); fail("committing a non-existing transaction should have failed"); } catch (NonRecoverableException e) { final String errmsg = e.getMessage(); @@ -214,7 +214,7 @@ public class TestKuduTransaction { try { // Try to commit the transaction in synchronous mode, i.e. initiate // committing the transaction and wait for the commit phase to finalize. - fakeTxn.commit(true); + fakeTxn.commit(); fail("committing a non-existing transaction should have failed"); } catch (NonRecoverableException e) { final String errmsg = e.getMessage(); @@ -241,7 +241,7 @@ public class TestKuduTransaction { }) public void testIsCommitComplete() throws Exception { KuduTransaction txn = client.newTransaction(); - txn.commit(false); + txn.startCommit(); assertFalse(txn.isCommitComplete()); } @@ -315,7 +315,7 @@ public class TestKuduTransaction { }) public void testCommitAnEmptyTransactionWait() throws Exception { KuduTransaction txn = client.newTransaction(); - txn.commit(true); + txn.commit(); assertTrue(txn.isCommitComplete()); } @@ -467,7 +467,7 @@ public class TestKuduTransaction { try (KuduTransaction txn = client.newTransaction()) { buf = txn.serialize(); assertNotNull(buf); - txn.commit(false); + txn.startCommit(); txn.isCommitComplete(); } catch (Exception e) { fail("unexpected exception: " + e.toString()); @@ -587,7 +587,7 @@ public class TestKuduTransaction { // It should be possible to commit the transaction since it supposed to be // open at this point even after multiples of the inactivity timeout // interval. - txn.commit(false); + txn.startCommit(); } catch (Exception e) { fail("unexpected exception: " + e.toString()); } @@ -614,7 +614,7 @@ public class TestKuduTransaction { NonRecoverableException.class, new ThrowingRunnable() { @Override public void run() throws Throwable { - txn.commit(false); + txn.startCommit(); } }); final String errmsg = ex.getMessage(); @@ -674,7 +674,7 @@ public class TestKuduTransaction { NonRecoverableException.class, new ThrowingRunnable() { @Override public void run() throws Throwable { - serdesTxn.commit(false); + serdesTxn.startCommit(); } }); final String errmsg = ex.getMessage(); @@ -715,11 +715,11 @@ public class TestKuduTransaction { // At this point, the underlying transaction should be kept open // because the 'serdesTxn' handle sends keepalive heartbeats even if the // original handle ceased to send those after calling 'close()' on it. - // As an extra sanity check, call 'commit()' and 'isCommitComplete()' + // As an extra sanity check, call 'startCommit()' and 'isCommitComplete()' // on both handles to make sure no exception is thrown. - serdesTxn.commit(false); + serdesTxn.startCommit(); serdesTxn.isCommitComplete(); - txn.commit(false); + txn.startCommit(); txn.isCommitComplete(); } } @@ -727,7 +727,7 @@ public class TestKuduTransaction { /** * This scenario validates the propagation of the commit timestamp for a * multi-row transaction when committing the transaction synchronously via - * {@link KuduTransaction#commit(boolean wait = true)} or calling + * {@link KuduTransaction#commit()} or calling * {@link KuduTransaction#isCommitComplete()} once the transaction's commit * has started to run asynchronously. */ @@ -769,7 +769,7 @@ public class TestKuduTransaction { assertEquals(0, session.countPendingErrors()); final long tsBeforeCommit = client.getLastPropagatedTimestamp(); - txn.commit(true /*wait*/); + txn.commit(); final long tsAfterCommit = client.getLastPropagatedTimestamp(); assertTrue(tsAfterCommit > tsBeforeCommit); } @@ -792,7 +792,7 @@ public class TestKuduTransaction { assertEquals(0, session.countPendingErrors()); final long tsBeforeCommit = client.getLastPropagatedTimestamp(); - txn.commit(false /*wait*/); + txn.startCommit(); assertEquals(tsBeforeCommit, client.getLastPropagatedTimestamp()); assertEventuallyTrue("commit should eventually finalize", @@ -821,7 +821,7 @@ public class TestKuduTransaction { { KuduTransaction txn = client.newTransaction(); final long tsBeforeCommit = client.getLastPropagatedTimestamp(); - txn.commit(true /*wait*/); + txn.commit(); // Just in case, linger a bit after commit has been finalized, checking // for the timestamp propagated to the client side. @@ -882,7 +882,7 @@ public class TestKuduTransaction { harness.startAllMasterServers(); // It should be possible to commit the transaction. - txn.commit(true /*wait*/); + txn.commit(); // An extra sanity check: read back the rows written into the table in the // context of the transaction. @@ -912,7 +912,7 @@ public class TestKuduTransaction { // It should be possible to commit the transaction: 2 out of 3 masters are // running and Raft should be able to establish a leader master. So, // txn-related operations routed through TxnManager should succeed. - txn.commit(true /*wait*/); + txn.commit(); // An extra sanity check: read back the rows written into the table in the // context of the transaction. @@ -975,7 +975,7 @@ public class TestKuduTransaction { t.start(); // It should be possible to commit the transaction. - txn.commit(true /*wait*/); + txn.commit(); // Just an extra sanity check: the thread should join pretty fast, otherwise // the call to KuduTransaction.commit() above could not succeed. @@ -1048,7 +1048,7 @@ public class TestKuduTransaction { // // * the client switches to the new TxnManager for other txn-related // operations as well - txn.commit(true /*wait*/); + txn.commit(); // An extra sanity check: read back the rows written into the table in the // context of the transaction. @@ -1146,7 +1146,7 @@ public class TestKuduTransaction { // The transaction should be still alive, and it should be possible to // commit it. - txn.commit(true /*wait*/); + txn.commit(); t.join();