This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.15.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 07a283ee1c990ec9639622cf621f2d36385941d5 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Mon May 17 16:11:03 2021 -0700 KUDU-3277 fix concurrent session flush issue in Java client This patch fixes the issue reported by KUDU-3277. The bug manifested itself in rare cases when a session running in the AUTO_FLUSH_BACKGROUND mode was either being flushed explicitly or implicitly (i.e. upon closing the session) while the AUTO_FLUSH_BACKGROUND session's logic was flushing its data buffers concurrently as well. This patch also adds a reproduction scenario for KUDU-3277. The newly introduced test scenario was reliably failing before the fix: * 'java.lang.AssertionError: This Deferred was already called' messages were encountered in the log multiple times with the stack exactly as described in KUDU-3277 * some flusher threads were unable to join since KuduSession.flush() would hang (i.e. would not return) Change-Id: If6aaccc06abf1a2673620ab7c649f51f91999ad9 Reviewed-on: http://gerrit.cloudera.org:8080/17486 Tested-by: Alexey Serbin <aser...@cloudera.com> Reviewed-by: Grant Henke <granthe...@apache.org> (cherry picked from commit d1969e2623e30bfd62f2463f15e0bf02422d1d84) Reviewed-on: http://gerrit.cloudera.org:8080/17493 Reviewed-by: Bankim Bhavsar <ban...@cloudera.com> --- .../org/apache/kudu/client/AsyncKuduSession.java | 44 +++++-- .../org/apache/kudu/client/TestKuduClient.java | 134 ++++++++++++++++++++- 2 files changed, 170 insertions(+), 8 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 59eb838..3b0f5c0 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -508,8 +508,9 @@ public class AsyncKuduSession implements SessionConfiguration { * @param buffer the buffer to return to the inactive queue. */ private void queueBuffer(Buffer buffer) { - inactiveBuffers.add(buffer); - buffer.callbackFlushNotification(); + if (buffer.callbackFlushNotification()) { + inactiveBuffers.add(buffer); + } Deferred<Void> localFlushNotification = flushNotification.getAndSet(new Deferred<>()); localFlushNotification.callback(null); } @@ -861,6 +862,7 @@ public class AsyncKuduSession implements SessionConfiguration { private FlusherTask flusherTask = null; private Deferred<Void> flushNotification = Deferred.fromResult(null); + private boolean flushNotificationFired = false; public List<BufferedOperation> getOperations() { return operations; @@ -884,12 +886,27 @@ public class AsyncKuduSession implements SessionConfiguration { } /** - * Completes the buffer's flush notification. Should be called when the buffer has been - * successfully flushed. + * Completes the buffer's flush notification. Should be called when + * the buffer has been successfully flushed. */ - void callbackFlushNotification() { + boolean callbackFlushNotification() { LOG.trace("buffer flush notification fired: {}", this); - flushNotification.callback(null); + if (injectLatencyBufferFlushCb) { + try { + Thread.sleep(randomizer.nextInt(16)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + synchronized (monitor) { + if (flushNotificationFired) { + // Do nothing: the callback has been called already. + return false; + } + flushNotificationFired = true; + flushNotification.callback(null); + } + return true; } /** @@ -901,6 +918,7 @@ public class AsyncKuduSession implements SessionConfiguration { LOG.trace("buffer resetUnlocked: {}", this); operations.clear(); flushNotification = new Deferred<>(); + flushNotificationFired = false; flusherTask = null; } @@ -976,4 +994,16 @@ public class AsyncKuduSession implements SessionConfiguration { .toString(); } } -} + + private static boolean injectLatencyBufferFlushCb = false; + + /** + * Inject latency into {@link Buffer#callbackFlushNotification}. + */ + @InterfaceAudience.LimitedPrivate("Test") + static void injectLatencyBufferFlushCb(boolean injectLatency) { + injectLatencyBufferFlushCb = injectLatency; + LOG.warn("latency injection for Buffer flush notification is {}", + injectLatency ? "enabled" : "disabled"); + } +} \ No newline at end of file diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 1d473bb..290ed3b 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -39,16 +39,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import java.io.Closeable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.math.BigDecimal; import java.sql.Date; -import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -56,9 +57,11 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.stumbleupon.async.Deferred; @@ -78,6 +81,7 @@ import org.apache.kudu.test.KuduTestHarness.LocationConfig; import org.apache.kudu.test.KuduTestHarness.MasterServerConfig; import org.apache.kudu.test.KuduTestHarness.TabletServerConfig; import org.apache.kudu.test.RandomUtils; +import org.apache.kudu.test.cluster.KuduBinaryInfo; import org.apache.kudu.util.DateUtil; import org.apache.kudu.util.DecimalUtil; import org.apache.kudu.util.TimestampUtil; @@ -1419,4 +1423,132 @@ public class TestKuduClient { OperationResponse respOld = session.apply(insertOld); assertFalse(respOld.hasRowError()); } + + /** + * This is a test scenario to reproduce conditions described in KUDU-3277. + * The scenario was failing before the fix: + * ** 'java.lang.AssertionError: This Deferred was already called' was + * encountered multiple times with the stack exactly as described in + * KUDU-3277 + * ** some flusher threads were unable to join since KuduSession.flush() + * would hang (i.e. would not return) + */ + @MasterServerConfig(flags = { + // A shorter TTL for tablet locations is necessary to induce more frequent + // calls to TabletLookupCB.queueBuffer(). + "--table_locations_ttl_ms=500", + }) + @Test(timeout = 100000) + public void testConcurrentFlush() throws Exception { + // This is a very intensive and stressful test scenario, so run it only + // against Kudu binaries built without sanitizers. + assumeTrue("this scenario is to run against non-sanitized binaries only", + KuduBinaryInfo.getSanitizerType() == KuduBinaryInfo.SanitizerType.NONE); + try { + AsyncKuduSession.injectLatencyBufferFlushCb(true); + + CreateTableOptions opts = new CreateTableOptions() + .addHashPartitions(ImmutableList.of("key"), 8) + .setRangePartitionColumns(ImmutableList.of("key")); + + Schema schema = ClientTestUtil.getBasicSchema(); + PartialRow lowerBoundA = schema.newPartialRow(); + PartialRow upperBoundA = schema.newPartialRow(); + upperBoundA.addInt("key", 0); + opts.addRangePartition(lowerBoundA, upperBoundA); + + PartialRow lowerBoundB = schema.newPartialRow(); + lowerBoundB.addInt("key", 0); + PartialRow upperBoundB = schema.newPartialRow(); + opts.addRangePartition(lowerBoundB, upperBoundB); + + KuduTable table = client.createTable(TABLE_NAME, schema, opts); + + final CountDownLatch keepRunning = new CountDownLatch(1); + final int numSessions = 50; + + List<KuduSession> sessions = new ArrayList<>(numSessions); + for (int i = 0; i < numSessions; ++i) { + KuduSession session = client.newSession(); + session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); + sessions.add(session); + } + + List<Thread> flushers = new ArrayList<>(numSessions); + Random random = RandomUtils.getRandom(); + { + for (int idx = 0; idx < numSessions; ++idx) { + final int threadIdx = idx; + Thread flusher = new Thread(new Runnable() { + @Override + public void run() { + KuduSession session = sessions.get(threadIdx); + try { + while (!keepRunning.await(random.nextInt(250), TimeUnit.MILLISECONDS)) { + session.flush(); + assertEquals(0, session.countPendingErrors()); + } + } catch (Exception e) { + fail("unexpected exception: " + e); + } + } + }); + flushers.add(flusher); + } + } + + final int numRowsPerSession = 10000; + final CountDownLatch insertersCompleted = new CountDownLatch(numSessions); + List<Thread> inserters = new ArrayList<>(numSessions); + { + for (int idx = 0; idx < numSessions; ++idx) { + final int threadIdx = idx; + final int keyStart = threadIdx * numRowsPerSession; + Thread inserter = new Thread(new Runnable() { + @Override + public void run() { + KuduSession session = sessions.get(threadIdx); + try { + for (int key = keyStart; key < keyStart + numRowsPerSession; ++key) { + Insert insert = ClientTestUtil.createBasicSchemaInsert(table, key); + assertNull(session.apply(insert)); + } + session.flush(); + } catch (Exception e) { + fail("unexpected exception: " + e); + } + insertersCompleted.countDown(); + } + }); + inserters.add(inserter); + } + } + + for (Thread flusher : flushers) { + flusher.start(); + } + for (Thread inserter : inserters) { + inserter.start(); + } + + // Wait for the inserter threads to finish. + insertersCompleted.await(); + // Signal the flusher threads to stop. + keepRunning.countDown(); + + for (Thread inserter : inserters) { + inserter.join(); + } + for (Thread flusher : flushers) { + flusher.join(); + } + + KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, table) + .readMode(AsyncKuduScanner.ReadMode.READ_YOUR_WRITES) + .build(); + assertEquals(numSessions * numRowsPerSession, countRowsInScan(scanner)); + } finally { + AsyncKuduSession.injectLatencyBufferFlushCb(false); + } + } }