This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.15.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 07a283ee1c990ec9639622cf621f2d36385941d5
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Mon May 17 16:11:03 2021 -0700

    KUDU-3277 fix concurrent session flush issue in Java client
    
    This patch fixes the issue reported by KUDU-3277.  The bug manifested
    itself in rare cases when a session running in the AUTO_FLUSH_BACKGROUND
    mode was either being flushed explicitly or implicitly (i.e. upon
    closing the session) while the AUTO_FLUSH_BACKGROUND session's logic was
    flushing its data buffers concurrently as well.
    
    This patch also adds a reproduction scenario for KUDU-3277.  The newly
    introduced test scenario was reliably failing before the fix:
    
      * 'java.lang.AssertionError: This Deferred was already called'
        messages were encountered in the log multiple times with the stack
        exactly as described in KUDU-3277
    
      * some flusher threads were unable to join since KuduSession.flush()
        would hang (i.e. would not return)
    
    Change-Id: If6aaccc06abf1a2673620ab7c649f51f91999ad9
    Reviewed-on: http://gerrit.cloudera.org:8080/17486
    Tested-by: Alexey Serbin <aser...@cloudera.com>
    Reviewed-by: Grant Henke <granthe...@apache.org>
    (cherry picked from commit d1969e2623e30bfd62f2463f15e0bf02422d1d84)
    Reviewed-on: http://gerrit.cloudera.org:8080/17493
    Reviewed-by: Bankim Bhavsar <ban...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   |  44 +++++--
 .../org/apache/kudu/client/TestKuduClient.java     | 134 ++++++++++++++++++++-
 2 files changed, 170 insertions(+), 8 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 59eb838..3b0f5c0 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -508,8 +508,9 @@ public class AsyncKuduSession implements 
SessionConfiguration {
      * @param buffer the buffer to return to the inactive queue.
      */
     private void queueBuffer(Buffer buffer) {
-      inactiveBuffers.add(buffer);
-      buffer.callbackFlushNotification();
+      if (buffer.callbackFlushNotification()) {
+        inactiveBuffers.add(buffer);
+      }
       Deferred<Void> localFlushNotification = flushNotification.getAndSet(new 
Deferred<>());
       localFlushNotification.callback(null);
     }
@@ -861,6 +862,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     private FlusherTask flusherTask = null;
 
     private Deferred<Void> flushNotification = Deferred.fromResult(null);
+    private boolean flushNotificationFired = false;
 
     public List<BufferedOperation> getOperations() {
       return operations;
@@ -884,12 +886,27 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     }
 
     /**
-     * Completes the buffer's flush notification. Should be called when the 
buffer has been
-     * successfully flushed.
+     * Completes the buffer's flush notification. Should be called when
+     * the buffer has been successfully flushed.
      */
-    void callbackFlushNotification() {
+    boolean callbackFlushNotification() {
       LOG.trace("buffer flush notification fired: {}", this);
-      flushNotification.callback(null);
+      if (injectLatencyBufferFlushCb) {
+        try {
+          Thread.sleep(randomizer.nextInt(16));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      synchronized (monitor) {
+        if (flushNotificationFired) {
+          // Do nothing: the callback has been called already.
+          return false;
+        }
+        flushNotificationFired = true;
+        flushNotification.callback(null);
+      }
+      return true;
     }
 
     /**
@@ -901,6 +918,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
       LOG.trace("buffer resetUnlocked: {}", this);
       operations.clear();
       flushNotification = new Deferred<>();
+      flushNotificationFired = false;
       flusherTask = null;
     }
 
@@ -976,4 +994,16 @@ public class AsyncKuduSession implements 
SessionConfiguration {
                         .toString();
     }
   }
-}
+
+  private static boolean injectLatencyBufferFlushCb = false;
+
+  /**
+   * Inject latency into {@link Buffer#callbackFlushNotification}.
+   */
+  @InterfaceAudience.LimitedPrivate("Test")
+  static void injectLatencyBufferFlushCb(boolean injectLatency) {
+    injectLatencyBufferFlushCb = injectLatency;
+    LOG.warn("latency injection for Buffer flush notification is {}",
+        injectLatency ? "enabled" : "disabled");
+  }
+}
\ No newline at end of file
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 1d473bb..290ed3b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -39,16 +39,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.Closeable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.sql.Date;
-import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,9 +57,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
@@ -78,6 +81,7 @@ import org.apache.kudu.test.KuduTestHarness.LocationConfig;
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig;
 import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.apache.kudu.test.RandomUtils;
+import org.apache.kudu.test.cluster.KuduBinaryInfo;
 import org.apache.kudu.util.DateUtil;
 import org.apache.kudu.util.DecimalUtil;
 import org.apache.kudu.util.TimestampUtil;
@@ -1419,4 +1423,132 @@ public class TestKuduClient {
     OperationResponse respOld = session.apply(insertOld);
     assertFalse(respOld.hasRowError());
   }
+
+  /**
+   * This is a test scenario to reproduce conditions described in KUDU-3277.
+   * The scenario was failing before the fix:
+   *   ** 'java.lang.AssertionError: This Deferred was already called' was
+   *       encountered multiple times with the stack exactly as described in
+   *       KUDU-3277
+   *   ** some flusher threads were unable to join since KuduSession.flush()
+   *      would hang (i.e. would not return)
+   */
+  @MasterServerConfig(flags = {
+      // A shorter TTL for tablet locations is necessary to induce more 
frequent
+      // calls to TabletLookupCB.queueBuffer().
+      "--table_locations_ttl_ms=500",
+  })
+  @Test(timeout = 100000)
+  public void testConcurrentFlush() throws Exception {
+    // This is a very intensive and stressful test scenario, so run it only
+    // against Kudu binaries built without sanitizers.
+    assumeTrue("this scenario is to run against non-sanitized binaries only",
+        KuduBinaryInfo.getSanitizerType() == 
KuduBinaryInfo.SanitizerType.NONE);
+    try {
+      AsyncKuduSession.injectLatencyBufferFlushCb(true);
+
+      CreateTableOptions opts = new CreateTableOptions()
+          .addHashPartitions(ImmutableList.of("key"), 8)
+          .setRangePartitionColumns(ImmutableList.of("key"));
+
+      Schema schema = ClientTestUtil.getBasicSchema();
+      PartialRow lowerBoundA = schema.newPartialRow();
+      PartialRow upperBoundA = schema.newPartialRow();
+      upperBoundA.addInt("key", 0);
+      opts.addRangePartition(lowerBoundA, upperBoundA);
+
+      PartialRow lowerBoundB = schema.newPartialRow();
+      lowerBoundB.addInt("key", 0);
+      PartialRow upperBoundB = schema.newPartialRow();
+      opts.addRangePartition(lowerBoundB, upperBoundB);
+
+      KuduTable table = client.createTable(TABLE_NAME, schema, opts);
+
+      final CountDownLatch keepRunning = new CountDownLatch(1);
+      final int numSessions = 50;
+
+      List<KuduSession> sessions = new ArrayList<>(numSessions);
+      for (int i = 0; i < numSessions; ++i) {
+        KuduSession session = client.newSession();
+        
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+        sessions.add(session);
+      }
+
+      List<Thread> flushers = new ArrayList<>(numSessions);
+      Random random = RandomUtils.getRandom();
+      {
+        for (int idx = 0; idx < numSessions; ++idx) {
+          final int threadIdx = idx;
+          Thread flusher = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              KuduSession session = sessions.get(threadIdx);
+              try {
+                while (!keepRunning.await(random.nextInt(250), 
TimeUnit.MILLISECONDS)) {
+                  session.flush();
+                  assertEquals(0, session.countPendingErrors());
+                }
+              } catch (Exception e) {
+                fail("unexpected exception: " + e);
+              }
+            }
+          });
+          flushers.add(flusher);
+        }
+      }
+
+      final int numRowsPerSession = 10000;
+      final CountDownLatch insertersCompleted = new 
CountDownLatch(numSessions);
+      List<Thread> inserters = new ArrayList<>(numSessions);
+      {
+        for (int idx = 0; idx < numSessions; ++idx) {
+          final int threadIdx = idx;
+          final int keyStart = threadIdx * numRowsPerSession;
+          Thread inserter = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              KuduSession session = sessions.get(threadIdx);
+              try {
+                for (int key = keyStart; key < keyStart + numRowsPerSession; 
++key) {
+                  Insert insert = 
ClientTestUtil.createBasicSchemaInsert(table, key);
+                  assertNull(session.apply(insert));
+                }
+                session.flush();
+              } catch (Exception e) {
+                fail("unexpected exception: " + e);
+              }
+              insertersCompleted.countDown();
+            }
+          });
+          inserters.add(inserter);
+        }
+      }
+
+      for (Thread flusher : flushers) {
+        flusher.start();
+      }
+      for (Thread inserter : inserters) {
+        inserter.start();
+      }
+
+      // Wait for the inserter threads to finish.
+      insertersCompleted.await();
+      // Signal the flusher threads to stop.
+      keepRunning.countDown();
+
+      for (Thread inserter : inserters) {
+        inserter.join();
+      }
+      for (Thread flusher : flushers) {
+        flusher.join();
+      }
+
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES)
+          .build();
+      assertEquals(numSessions * numRowsPerSession, countRowsInScan(scanner));
+    } finally {
+      AsyncKuduSession.injectLatencyBufferFlushCb(false);
+    }
+  }
 }

Reply via email to