This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 0a32542 KUDU-2612: Java client failover scenarios for TxnManager 0a32542 is described below commit 0a32542ca5669c842e91b1b736d108a7ae84ec5c Author: Alexey Serbin <ale...@apache.org> AuthorDate: Sun Mar 7 21:42:26 2021 -0800 KUDU-2612: Java client failover scenarios for TxnManager Added a couple of test scenarios to verify that Java client automatically switches to other available TxnManager for performing txn-related operations. Sending txn keep-alive messages isn't covered yet: I'm planning to address these separately since some extra changes are needed there which would be easier to review in a separate patch. I also updated the inline doc for KuduTransaction.isCommitComplete() to be more explicit about possible exceptions thrown. Change-Id: I9c7d73ce4a74d426286facbf02dff6e48b46a7c0 Reviewed-on: http://gerrit.cloudera.org:8080/17297 Tested-by: Alexey Serbin <aser...@cloudera.com> Reviewed-by: Andrew Wong <aw...@cloudera.com> --- .../org/apache/kudu/client/KuduTransaction.java | 6 + .../main/java/org/apache/kudu/client/RpcProxy.java | 2 +- .../apache/kudu/client/TestKuduTransaction.java | 174 ++++++++++++++++++++- 3 files changed, 177 insertions(+), 5 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java index 2e7bf41..ed61497 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java @@ -301,6 +301,12 @@ public class KuduTransaction implements AutoCloseable { * Check whether the commit phase for a transaction is complete. * * @return {@code true} if transaction has finalized, otherwise {@code false} + * @throws NonRecoverableException with Status.Aborted() + * if transaction has been or is being aborted + * @throws NonRecoverableException with Status.IllegalState() + * if transaction is still open (i.e. commit() hasn't been called yet) + * @throws NonRecoverableException with Status.NotSupported() + * if transaction is in unexpected state (non-compatible backend?) * @throws KuduException if an error happens while querying the system about * the state of the transaction */ diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java index 28784ab..6c4350f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java @@ -442,7 +442,7 @@ class RpcProxy { // TODO(aserbin): try sending request to other TxnManager instance, // if possible. The idea is that Kudu clusters are expected - // expected to have multiple masters, so if one TxnManager + // to have multiple masters, so if one TxnManager // instance is not available, there is a high chance that // others are still available (TxnManager is hosted by a // kudu-master process). diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java index 5b8b035..5c97e54 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java @@ -17,6 +17,7 @@ package org.apache.kudu.client; +import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -28,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import com.google.common.collect.ImmutableList; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import org.junit.Before; @@ -35,6 +37,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.function.ThrowingRunnable; +import org.apache.kudu.test.ClientTestUtil; import org.apache.kudu.test.KuduTestHarness; import org.apache.kudu.test.KuduTestHarness.MasterServerConfig; import org.apache.kudu.test.KuduTestHarness.TabletServerConfig; @@ -446,7 +449,7 @@ public class TestKuduTransaction { */ @Test(timeout = 100000) @MasterServerConfig(flags = { - "--txn_manager_enabled=true", + "--txn_manager_enabled", }) @TabletServerConfig(flags = { "--txn_schedule_background_tasks=false" @@ -497,7 +500,7 @@ public class TestKuduTransaction { */ @Test(timeout = 100000) @MasterServerConfig(flags = { - "--txn_manager_enabled=true", + "--txn_manager_enabled", }) public void testSerializationOptions() throws Exception { final KuduTransaction txn = client.newTransaction(); @@ -560,7 +563,7 @@ public class TestKuduTransaction { */ @Test(timeout = 100000) @MasterServerConfig(flags = { - "--txn_manager_enabled=true", + "--txn_manager_enabled", }) @TabletServerConfig(flags = { "--txn_keepalive_interval_ms=200", @@ -628,7 +631,7 @@ public class TestKuduTransaction { */ @Test(timeout = 100000) @MasterServerConfig(flags = { - "--txn_manager_enabled=true", + "--txn_manager_enabled", }) @TabletServerConfig(flags = { "--txn_keepalive_interval_ms=200", @@ -713,4 +716,167 @@ public class TestKuduTransaction { txn.isCommitComplete(); } } + + /** + * Test to verify that Kudu client is able to switch to TxnManager hosted by + * other kudu-master process when the previously used one isn't available. + */ + @Test(timeout = 100000) + @MasterServerConfig(flags = { + // TxnManager functionality is necessary for this scenario. + "--txn_manager_enabled", + + // Set Raft heartbeat interval short for faster test runtime: speed up + // leader failure detection and new leader election. + "--raft_heartbeat_interval_ms=100", + }) + public void testSwitchToOtherTxnManager() throws Exception { + final String TABLE_NAME = "txn_manager_ops_fallback"; + client.createTable( + TABLE_NAME, + ClientTestUtil.getBasicSchema(), + new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2)); + + // Start a transaction, then restart every available TxnManager instance + // before attempting any txn-related operation. + { + KuduTransaction txn = client.newTransaction(); + KuduSession session = txn.newKuduSession(); + + KuduTable table = client.openTable(TABLE_NAME); + + Insert insert = createBasicSchemaInsert(table, 0); + session.apply(insert); + session.flush(); + + harness.killAllMasterServers(); + harness.startAllMasterServers(); + + // Querying the status of a transaction should be possible, as usual. + // Since the transaction is still open, KuduTransaction.isCommitComplete() + // should throw corresponding exception with Status.IllegalState. + try { + txn.isCommitComplete(); + fail("KuduTransaction.isCommitComplete should have thrown"); + } catch (NonRecoverableException e) { + assertTrue(e.getStatus().toString(), e.getStatus().isIllegalState()); + assertEquals("transaction is still open", e.getMessage()); + } + + harness.killAllMasterServers(); + harness.startAllMasterServers(); + + // It should be possible to commit the transaction. + txn.commit(true /*wait*/); + + // An extra sanity check: read back the row written into the table in the + // context of the transaction. + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .replicaSelection(ReplicaSelection.LEADER_ONLY) + .build(); + + assertEquals(1, scanner.nextRows().getNumRows()); + } + + // Similar to the above, but run KuduTransaction.commit() when only 2 out + // of 3 masters are running while the TxnManager which used to start the + // transaction is no longer around. + { + KuduTransaction txn = client.newTransaction(); + KuduSession session = txn.newKuduSession(); + + KuduTable table = client.openTable(TABLE_NAME); + + Insert insert = createBasicSchemaInsert(table, 1); + session.apply(insert); + session.flush(); + + harness.killLeaderMasterServer(); + + // It should be possible to commit the transaction: 2 out of 3 masters are + // running and Raft should be able to establish a leader master. So, + // txn-related operations routed through TxnManager should succeed. + txn.commit(true /*wait*/); + + // An extra sanity check: read back the row written into the table in the + // context of the transaction. + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .replicaSelection(ReplicaSelection.LEADER_ONLY) + .build(); + + // It's an empty transaction, and 1 row should be there from the prior + // sub-scenario. + assertEquals(1, scanner.nextRows().getNumRows()); + } + } + + /** + * Test to verify that Kudu client is able to switch to TxnManager hosted by + * other kudu-master process when the previously used one isn't available, + * even if txn-related calls first are issued when no TxnManager was running. + */ + @Test(timeout = 100000) + @MasterServerConfig(flags = { + // TxnManager functionality is necessary for this scenario. + "--txn_manager_enabled", + + // Set Raft heartbeat interval short for faster test runtime: speed up + // leader failure detection and new leader election. + "--raft_heartbeat_interval_ms=100", + }) + public void testSwitchToOtherTxnManagerInFlightCalls() throws Exception { + final String TABLE_NAME = "txn_manager_ops_fallback_inflight"; + client.createTable( + TABLE_NAME, + ClientTestUtil.getBasicSchema(), + new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2)); + + KuduTransaction txn = client.newTransaction(); + KuduSession session = txn.newKuduSession(); + + KuduTable table = client.openTable(TABLE_NAME); + + Insert insert = createBasicSchemaInsert(table, 0); + session.apply(insert); + session.flush(); + + harness.killAllMasterServers(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + // Sleep for some time to allow the KuduTransaction.commit() call + // below issue RPCs to non-running TxnManangers. + Thread.sleep(1000); + harness.startAllMasterServers(); + } catch (Exception e) { + fail("failed to start all masters: " + e); + } + } + }); + t.start(); + + // It should be possible to commit the transaction. + txn.commit(true /*wait*/); + + // Just an extra sanity check: the thread should join pretty fast, otherwise + // the call to KuduTransaction.commit() above could not succeed. + t.join(250); + + // An extra sanity check: read back the row written into the table in the + // context of the transaction. + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .replicaSelection(ReplicaSelection.LEADER_ONLY) + .build(); + + assertEquals(1, scanner.nextRows().getNumRows()); + } + + // TODO(aserbin): when test harness allows for sending Kudu servers particular + // signals, add a test scenario to verify that timeout for + // TxnManager request is set low enough to detect 'frozen' + // TxnManager instance (e.g., sent SIGSTOP signal), and is able + // to switch to another TxnManager to send txn keepalive + // requests fast enough to keep the transaction alive. }