This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.15.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.15.x by this push:
     new a6de161  KUDU-2612 automatically flush sessions on txn commit (Java 
client)
a6de161 is described below

commit a6de16122066feb6951f68f557c28374a4e02840
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Thu May 13 16:56:42 2021 -0700

    KUDU-2612 automatically flush sessions on txn commit (Java client)
    
    With this patch, all transactional sessions created off a transction
    handle are automatically flushed upon calling commit() on the handle.
    
    As for the KuduTransaction.startCommit() method, it's now necessary
    to flush all the transactional sessions created off the transaction
    handle before calling the method, otherwise a NonRecoverableException
    based on Status.Incomplete() is thrown.
    
    This patch also contains several test scenarios for the newly introduced
    functionality.
    
    This changelist is a Java client's counterpart for 45ca93f0e.
    
    Change-Id: I0c52578dd736906cf2610c8cc58496381a9b73ec
    Reviewed-on: http://gerrit.cloudera.org:8080/17452
    Tested-by: Alexey Serbin <aser...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Grant Henke <granthe...@apache.org>
    (cherry picked from commit 1f30113f8f8471cecd10caba9c43f6390c135ae7)
    Reviewed-on: http://gerrit.cloudera.org:8080/17471
    Tested-by: Kudu Jenkins
    Reviewed-by: Bankim Bhavsar <ban...@cloudera.com>
---
 .../org/apache/kudu/client/KuduTransaction.java    |  51 ++-
 .../apache/kudu/client/TestKuduTransaction.java    | 397 +++++++++++++++++++++
 2 files changed, 445 insertions(+), 3 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
index 4abf71f..ff59530 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -18,6 +18,8 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.CodedInputStream;
@@ -158,6 +160,9 @@ public class KuduTransaction implements AutoCloseable {
   private final Object isInFlightSync = new Object();
   private Timeout keepaliveTaskHandle = null;
   private final Object keepaliveTaskHandleSync = new Object();
+  private boolean isCommitStarted = false;
+  private final Object isCommitStartedSync = new Object();
+  private List<AsyncKuduSession> sessions = new ArrayList<>();
 
   /**
    * Create an instance of a transaction handle bound to the specified client.
@@ -213,6 +218,9 @@ public class KuduTransaction implements AutoCloseable {
    */
   void begin() throws KuduException {
     synchronized (isInFlightSync) {
+      // Perform a cursory state check to make sure begin() hasn't been called
+      // yet (this isn't intended to help if begin() is called concurrently 
from
+      // different threads, though).
       Preconditions.checkState(!isInFlight);
     }
 
@@ -239,9 +247,17 @@ public class KuduTransaction implements AutoCloseable {
    */
   public AsyncKuduSession newAsyncKuduSession() {
     synchronized (isInFlightSync) {
-      Preconditions.checkState(isInFlight);
+      Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN);
     }
-    return client.newTransactionalSession(txnId);
+
+    AsyncKuduSession session = null;
+    synchronized (isCommitStartedSync) {
+      Preconditions.checkState(!isCommitStarted, "commit already started");
+      session = client.newTransactionalSession(txnId);
+      sessions.add(session);
+    }
+    Preconditions.checkNotNull(session);
+    return session;
   }
 
   /**
@@ -488,7 +504,36 @@ public class KuduTransaction implements AutoCloseable {
   }
 
   private void commitWithMode(CommitMode mode) throws KuduException {
-    Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN);
+    synchronized (isInFlightSync) {
+      Preconditions.checkState(isInFlight, ERRMSG_TXN_NOT_OPEN);
+    }
+    synchronized (isCommitStartedSync) {
+      isCommitStarted = true;
+    }
+    for (AsyncKuduSession s : sessions) {
+      if (mode == CommitMode.WAIT_FOR_COMPLETION) {
+        // Flush each session's pending operations.
+        List<OperationResponse> results =
+            KuduClient.joinAndHandleException(s.flush());
+        for (OperationResponse result : results) {
+          if (result.hasRowError()) {
+            throw new NonRecoverableException(Status.Incomplete(String.format(
+                "failed to flush a transactional session: %s",
+                result.getRowError().toString())));
+          }
+        }
+      } else {
+        // Make sure no write operations are pending in any of the 
transactional
+        // sessions, i.e. everything has been flushed. This is rather a cursory
+        // check, it's not intended to protect against concurrent activity on
+        // transaction sessions when startCommit() is being called.
+        if (s.hasPendingOperations()) {
+          throw new NonRecoverableException(Status.IllegalState(
+              "cannot start committing transaction: at least one " +
+              "transactional session has write operations pending"));
+        }
+      }
+    }
     CommitTransactionRequest req = doCommitTransaction();
     // Now, there is no need to continue sending keepalive messages: the
     // transaction should be in COMMIT_IN_PROGRESS state after successful
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
index 3f753d3..2681483 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTransaction.java
@@ -1209,6 +1209,403 @@ public class TestKuduTransaction {
     assertEquals(numMasters, countRowsInScan(scanner));
   }
 
+  /**
+   * Make sure {@link KuduTransaction#commit} flushes pending operations
+   * for all sessions created off the {@link KuduTransaction} handle.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testFlushSessionsOnCommit() throws Exception {
+    final String TABLE_NAME = "flush_sessions_on_commit";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    // Regardless of the flush mode, a transactional session is automatically
+    // flushed when the transaction is committed.
+    {
+      final SessionConfiguration.FlushMode[] kFlushModes = {
+          SessionConfiguration.FlushMode.MANUAL_FLUSH,
+          SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND,
+          SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC,
+      };
+
+      for (SessionConfiguration.FlushMode mode : kFlushModes) {
+        KuduTransaction txn = client.newTransaction();
+        KuduSession session = txn.newKuduSession();
+        session.setFlushMode(mode);
+        Insert insert = createBasicSchemaInsert(table, key++);
+        session.apply(insert);
+
+        if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+          assertTrue(session.hasPendingOperations());
+        }
+
+        txn.commit();
+
+        assertFalse(session.hasPendingOperations());
+        assertEquals(0, session.getPendingErrors().getRowErrors().length);
+      }
+
+      // Make sure all the applied rows have been persisted.
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+          .build();
+      assertEquals(key, countRowsInScan(scanner));
+    }
+
+    // Make sure that all the transactional sessions are flushed upon 
committing
+    // a transaction.
+    {
+      KuduTransaction txn = client.newTransaction();
+      List<KuduSession> sessions = new ArrayList<>(10);
+      for (int i = 0; i < 10; ++i) {
+        KuduSession s = txn.newKuduSession();
+        s.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+        Insert insert = createBasicSchemaInsert(table, key++);
+        s.apply(insert);
+        assertTrue(s.hasPendingOperations());
+        sessions.add(s);
+      }
+
+      txn.commit();
+
+      for (KuduSession session : sessions) {
+        assertFalse(session.hasPendingOperations());
+        assertEquals(0, session.getPendingErrors().getRowErrors().length);
+      }
+
+      // Make sure all the applied rows have been persisted.
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+          .build();
+      assertEquals(key, countRowsInScan(scanner));
+    }
+
+    // Closing and flushing transactional sessions explicitly prior to commit
+    // is totally fine as well.
+    {
+      KuduTransaction txn = client.newTransaction();
+      {
+        KuduSession s = txn.newKuduSession();
+        s.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+        Insert insert = createBasicSchemaInsert(table, key++);
+        s.apply(insert);
+        s.close();
+      }
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+      session.flush();
+
+      txn.commit();
+
+      assertFalse(session.hasPendingOperations());
+      assertEquals(0, session.getPendingErrors().getRowErrors().length);
+
+      // Make sure all the applied rows have been persisted.
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+          .build();
+      assertEquals(key, countRowsInScan(scanner));
+    }
+  }
+
+  /**
+   * Make sure it's possible to recover from an error occurred while flushing
+   * a transactional session: a transaction handle stays valid and it's 
possible
+   * to retry calling {@link KuduTransaction#commit()} after handling session
+   * flush errors.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testRetryCommitAfterSessionFlushError() throws Exception {
+    final String TABLE_NAME = "retry_commit_after_session_flush_error";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    KuduTransaction txn = client.newTransaction();
+    KuduSession session = txn.newKuduSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    {
+      Insert insert = createBasicSchemaInsert(table, key);
+      session.apply(insert);
+    }
+    // Try to insert a row with a duplicate key.
+    {
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+    }
+
+    try {
+      txn.commit();
+      fail("committing a transaction with duplicate row should have failed");
+    } catch (NonRecoverableException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isIncomplete());
+      assertTrue(errmsg, errmsg.matches(
+          "failed to flush a transactional session: .*"));
+    }
+
+    // Insert one more row using the same session.
+    {
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+    }
+
+    // Now, retry committing the transaction.
+    txn.commit();
+
+    assertEquals(0, session.getPendingErrors().getRowErrors().length);
+
+    // Make sure all the applied rows have been persisted.
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+        .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+        .build();
+    assertEquals(key, countRowsInScan(scanner));
+  }
+
+  /**
+   * Make sure {@link KuduTransaction#startCommit} succeeds when called on
+   * a transaction handle which has all of its transactional sessions flushed.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testStartCommitWithFlushedSessions() throws Exception {
+    final String TABLE_NAME = "start_commit_with_flushed_sessions";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    KuduTransaction txn = client.newTransaction();
+    {
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+      assertFalse(session.hasPendingOperations());
+      assertEquals(0, session.getPendingErrors().getRowErrors().length);
+    }
+
+    KuduSession session = txn.newKuduSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    Insert insert = createBasicSchemaInsert(table, key);
+    session.apply(insert);
+    assertTrue(session.hasPendingOperations());
+    session.flush();
+    assertFalse(session.hasPendingOperations());
+
+    // KuduTransaction.startCommit() should succeed now.
+    txn.startCommit();
+  }
+
+  /**
+   * Check the behavior of {@link KuduTransaction#startCommit} when there are
+   * non-flushed transactional sessions started off a transaction handle.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testStartCommitWithNonFlushedSessions() throws Exception {
+    final String TABLE_NAME = "non_flushed_sessions_on_start_commit";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    KuduTransaction txn = client.newTransaction();
+
+    // Create one session which will have no pending operations upon
+    // startCommit()
+    {
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+      assertFalse(session.hasPendingOperations());
+      assertEquals(0, session.getPendingErrors().getRowErrors().length);
+    }
+
+    KuduSession session = txn.newKuduSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    Insert insert = createBasicSchemaInsert(table, key);
+    session.apply(insert);
+    assertTrue(session.hasPendingOperations());
+
+    try {
+      txn.startCommit();
+      fail("startCommit() should have failed when operations are pending");
+    } catch (NonRecoverableException e) {
+      final String errmsg = e.getMessage();
+      final Status status = e.getStatus();
+      assertTrue(status.toString(), status.isIllegalState());
+      assertTrue(errmsg, errmsg.matches(
+          ".* at least one transactional session has write operations 
pending"));
+    }
+
+    assertTrue(session.hasPendingOperations());
+    assertEquals(0, session.getPendingErrors().getRowErrors().length);
+  }
+
+  /**
+   * Verify the behavior of {@link KuduTransaction#newAsyncKuduSession} when 
the
+   * commit process has already been started for the corresponding transaction.
+   * This automatically verifies the behavior of
+   * {@link KuduTransaction#newKuduSession} because it works via
+   * {@link KuduTransaction#newAsyncKuduSession}.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testNewSessionAfterCommit() throws Exception {
+    final String TABLE_NAME = "new_session_after_commit";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    {
+      KuduTransaction txn = client.newTransaction();
+      KuduSession session = txn.newKuduSession();
+      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+      {
+        Insert insert = createBasicSchemaInsert(table, key);
+        session.apply(insert);
+      }
+      // Try to insert a row with a duplicate key.
+      {
+        Insert insert = createBasicSchemaInsert(table, key);
+        session.apply(insert);
+      }
+      try {
+        txn.commit();
+        fail("committing a transaction with duplicate row should have failed");
+      } catch (NonRecoverableException e) {
+        final String errmsg = e.getMessage();
+        final Status status = e.getStatus();
+        assertTrue(status.toString(), status.isIncomplete());
+        assertTrue(errmsg, errmsg.matches(
+            "failed to flush a transactional session: .*"));
+      }
+
+      try {
+        txn.newAsyncKuduSession();
+        fail("newKuduSession() should throw when transaction already 
committed");
+      } catch (IllegalStateException e) {
+        final String errmsg = e.getMessage();
+        assertTrue(errmsg, errmsg.matches("commit already started"));
+      }
+      txn.rollback();
+    }
+
+    {
+      KuduTransaction txn = client.newTransaction();
+      txn.commit();
+      try {
+        txn.newAsyncKuduSession();
+        fail("newKuduSession() should throw when transaction already 
committed");
+      } catch (IllegalStateException e) {
+        final String errmsg = e.getMessage();
+        assertTrue(errmsg, errmsg.matches(
+            "transaction is not open for this handle"));
+      }
+    }
+  }
+
+  /**
+   * This scenario is similar to the scenario above, but it calls
+   * {@link KuduTransaction#startCommit} instead of
+   * {@link KuduTransaction#commit}.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testCreateSessionAfterStartCommit() throws Exception {
+    KuduTransaction txn = client.newTransaction();
+    txn.startCommit();
+    try {
+      txn.newAsyncKuduSession();
+      fail("newKuduSession() should throw when transaction already committed");
+    } catch (IllegalStateException e) {
+      final String errmsg = e.getMessage();
+      assertTrue(errmsg, errmsg.matches(
+          "transaction is not open for this handle"));
+    }
+  }
+
+  /**
+   * A test scenario to verify the behavior of the client API when a write
+   * operation submitted into a transaction session after the transaction
+   * has already been committed.
+   */
+  @Test(timeout = 100000)
+  @MasterServerConfig(flags = {
+      // TxnManager functionality is necessary for this scenario.
+      "--txn_manager_enabled",
+  })
+  public void testSubmitWriteOpAfterCommit() throws Exception {
+    final String TABLE_NAME = "submit_write_op_after_commit";
+    client.createTable(
+        TABLE_NAME,
+        ClientTestUtil.getBasicSchema(),
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduTable table = client.openTable(TABLE_NAME);
+    int key = 0;
+
+    KuduTransaction txn = client.newTransaction();
+    KuduSession session = txn.newKuduSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    {
+      Insert insert = createBasicSchemaInsert(table, key++);
+      session.apply(insert);
+    }
+
+    txn.commit();
+
+    {
+      Insert insert = createBasicSchemaInsert(table, key);
+      session.apply(insert);
+    }
+    List<OperationResponse> results = session.flush();
+    assertEquals(1, results.size());
+    OperationResponse rowResult = results.get(0);
+    assertTrue(rowResult.hasRowError());
+    String errmsg = rowResult.getRowError().toString();
+    assertTrue(errmsg, errmsg.matches(
+        ".* transaction ID .* not open: COMMITTED .*"));
+  }
+
   // TODO(aserbin): when test harness allows for sending Kudu servers 
particular
   //                signals, add a test scenario to verify that timeout for
   //                TxnManager request is set low enough to detect 'frozen'

Reply via email to