This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new b784a41 KUDU-2612 register txn Java sessions with client b784a41 is described below commit b784a41290365d7eacb10048b45ea86708b5b314 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Fri May 14 19:22:24 2021 -0700 KUDU-2612 register txn Java sessions with client Prior to this patch, the assertion in AsyncKuduClient.removeSession() would trigger upon closing a transactional session using via {AsyncKuduSession,KuduSession}.close(). This patch address the issue by registering the newly created transactional session with corresponding AsyncKuduClient. In addition, a test scenario is added to catch regressions. This is a follow-up to 7432a7a8aa0c98f8d2177c25b99576b51ff33a93. Change-Id: I117abc938bca0e698854d944c3fd6f831d3f9ce0 Reviewed-on: http://gerrit.cloudera.org:8080/17451 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <aw...@cloudera.com> --- .../org/apache/kudu/client/AsyncKuduClient.java | 11 +++++ .../org/apache/kudu/client/KuduTransaction.java | 7 +-- .../apache/kudu/client/TestKuduTransaction.java | 51 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 35f8477..cfa03cd 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -2614,6 +2614,17 @@ public class AsyncKuduClient implements AutoCloseable { return closeAllSessions().addCallbackDeferring(new DisconnectCB()); } + // Create a new transactional session in the context of the transaction + // with the specified identifier. + AsyncKuduSession newTransactionalSession(long txnId) { + checkIsClosed(); + AsyncKuduSession session = new AsyncKuduSession(this, txnId); + synchronized (sessions) { + sessions.add(session); + } + return session; + } + private void checkIsClosed() { if (closed) { throw new IllegalStateException("Cannot proceed, the client has already been closed"); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java index 9ea71b9..4abf71f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java @@ -241,7 +241,7 @@ public class KuduTransaction implements AutoCloseable { synchronized (isInFlightSync) { Preconditions.checkState(isInFlight); } - return new AsyncKuduSession(client, txnId); + return client.newTransactionalSession(txnId); } /** @@ -253,10 +253,7 @@ public class KuduTransaction implements AutoCloseable { * @return a new {@link KuduSession} instance */ public KuduSession newKuduSession() { - synchronized (isInFlightSync) { - Preconditions.checkState(isInFlight); - } - return new KuduSession(new AsyncKuduSession(client, txnId)); + return new KuduSession(newAsyncKuduSession()); } /** diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java index 0a97e8d..3f753d3 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java @@ -227,6 +227,57 @@ public class TestKuduTransaction { } /** + * Transactional sessions can be closed as regular ones. + */ + @Test(timeout = 100000) + @MasterServerConfig(flags = { + "--txn_manager_enabled", + }) + public void testTxnSessionClose() throws Exception { + final String TABLE_NAME = "txn_session_close"; + client.createTable( + TABLE_NAME, + ClientTestUtil.getBasicSchema(), + new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2)); + KuduTable table = client.openTable(TABLE_NAME); + + // Open and close an empty transaction session. + { + KuduTransaction txn = client.newTransaction(); + assertNotNull(txn); + KuduSession session = txn.newKuduSession(); + assertNotNull(session); + assertFalse(session.isClosed()); + session.close(); + assertTrue(session.isClosed()); + } + + // Open new transaction, insert one row for a session, close the session + // and then rollback the transaction. No rows should be persisted. + { + KuduTransaction txn = client.newTransaction(); + assertNotNull(txn); + KuduSession session = txn.newKuduSession(); + assertNotNull(session); + session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + + Insert insert = createBasicSchemaInsert(table, 1); + session.apply(insert); + session.close(); + + txn.rollback(); + + assertTrue(session.isClosed()); + assertEquals(0, session.countPendingErrors()); + + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES) + .build(); + assertEquals(0, countRowsInScan(scanner)); + } + } + + /** * Test scenario that starts a new transaction, initiates its commit phase, * and checks whether the commit is complete using the * KuduTransaction.isCommitComplete() method.