This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a7922c63d [tests] Fix flakiness in TestAsyncKuduSession.java
a7922c63d is described below

commit a7922c63d064f68a32b1829e6423cde68f76133b
Author: kedeng <kdeng...@gmail.com>
AuthorDate: Fri Jul 21 14:28:38 2023 +0800

    [tests] Fix flakiness in TestAsyncKuduSession.java
    
    The test testFlushBySize within TestAsyncKuduSession.java
    has been flaky with a 39% failure rate for the last weeks.
    
    The main reason for the failure of this case is that the
    newly added unit test did not consider the impact of the
    cache. I have modified the relevant code of the unit test
    to verify that the newly added flush strategy is indeed
    effective by performing a large number of insert operations
    exceeding the number of cache buffers.
    
    Change-Id: Iadc61fcddb9ffbfe05ef398ba61c79f8063d72de
    Reviewed-on: http://gerrit.cloudera.org:8080/20238
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 .../apache/kudu/client/TestAsyncKuduSession.java   | 35 ++++++++++++++--------
 1 file changed, 22 insertions(+), 13 deletions(-)

diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 49e3f0662..02876c5d5 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -396,7 +396,9 @@ public class TestAsyncKuduSession {
   public void testFlushBySize() throws Exception {
     AsyncKuduSession session = client.newSession();
     final int kBufferSizeOps = 10;
-    final int kNumOps = 2;
+    // Considering the existence of buffers, we set a number of operations 
that is significantly
+    // larger than the number of buffers to ensure that the buffers are 
triggered to flush.
+    final int kNumOps = 100;
     // Set a small buffer size so we should flush every time.
     session.setMutationBufferSpace(kBufferSizeOps, 1);
     // Set a large flush interval so if the flush by size function is not 
correctly implemented,
@@ -405,21 +407,28 @@ public class TestAsyncKuduSession {
     session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
 
     for (int i = 0; i < kNumOps; i++) {
-      // Should always flush immediately so here join will return soon.
-      OperationResponse resp = 
session.apply(createInsert(i)).join(DEFAULT_SLEEP);
-      assertFalse(resp.hasRowError());
+      // If the client tries to buffer many more operations, it may receive a
+      // PleaseThrottleException. In this case, if the client simply waits for 
a flush notification
+      // on the Deferred returned with the exception, it can continue to 
buffer operations.
+      Insert insert = createInsert(i);
+      try {
+        session.apply(insert);
+      } catch (PleaseThrottleException ex) {
+        ex.getDeferred().join(DEFAULT_SLEEP);
+        session.apply(insert);
+      }
     }
-    // Mode AUTO_FLUSH_BACKGROUND also takes time, so we may need wait here.
-    assertEventuallyTrue(String.format("Timeout for flush pending operations"),
-        new BooleanExpression() {
-          @Override
-          public boolean get() throws Exception {
-            return !session.hasPendingOperations();
-          }
-        }, /* timeoutMillis = */500000);
+    // There might be pending requests in the cache, but the above operation 
should not generate any
+    // errors.
     assertEquals(0, session.countPendingErrors());
     // Confirm that we can still make progress.
-    session.apply(createInsert(kNumOps)).join(DEFAULT_SLEEP);
+    Insert insert = createInsert(kNumOps);
+    try {
+      session.apply(insert);
+    } catch (PleaseThrottleException ex) {
+      ex.getDeferred().join(DEFAULT_SLEEP);
+      session.apply(insert);
+    }
 
     for (OperationResponse resp: session.flush().join(DEFAULT_SLEEP)) {
       assertFalse(resp.hasRowError());

Reply via email to