This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new a7922c63d [tests] Fix flakiness in TestAsyncKuduSession.java a7922c63d is described below commit a7922c63d064f68a32b1829e6423cde68f76133b Author: kedeng <kdeng...@gmail.com> AuthorDate: Fri Jul 21 14:28:38 2023 +0800 [tests] Fix flakiness in TestAsyncKuduSession.java The test testFlushBySize within TestAsyncKuduSession.java has been flaky with a 39% failure rate for the last weeks. The main reason for the failure of this case is that the newly added unit test did not consider the impact of the cache. I have modified the relevant code of the unit test to verify that the newly added flush strategy is indeed effective by performing a large number of insert operations exceeding the number of cache buffers. Change-Id: Iadc61fcddb9ffbfe05ef398ba61c79f8063d72de Reviewed-on: http://gerrit.cloudera.org:8080/20238 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- .../apache/kudu/client/TestAsyncKuduSession.java | 35 ++++++++++++++-------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java index 49e3f0662..02876c5d5 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java @@ -396,7 +396,9 @@ public class TestAsyncKuduSession { public void testFlushBySize() throws Exception { AsyncKuduSession session = client.newSession(); final int kBufferSizeOps = 10; - final int kNumOps = 2; + // Considering the existence of buffers, we set a number of operations that is significantly + // larger than the number of buffers to ensure that the buffers are triggered to flush. + final int kNumOps = 100; // Set a small buffer size so we should flush every time. session.setMutationBufferSpace(kBufferSizeOps, 1); // Set a large flush interval so if the flush by size function is not correctly implemented, @@ -405,21 +407,28 @@ public class TestAsyncKuduSession { session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); for (int i = 0; i < kNumOps; i++) { - // Should always flush immediately so here join will return soon. - OperationResponse resp = session.apply(createInsert(i)).join(DEFAULT_SLEEP); - assertFalse(resp.hasRowError()); + // If the client tries to buffer many more operations, it may receive a + // PleaseThrottleException. In this case, if the client simply waits for a flush notification + // on the Deferred returned with the exception, it can continue to buffer operations. + Insert insert = createInsert(i); + try { + session.apply(insert); + } catch (PleaseThrottleException ex) { + ex.getDeferred().join(DEFAULT_SLEEP); + session.apply(insert); + } } - // Mode AUTO_FLUSH_BACKGROUND also takes time, so we may need wait here. - assertEventuallyTrue(String.format("Timeout for flush pending operations"), - new BooleanExpression() { - @Override - public boolean get() throws Exception { - return !session.hasPendingOperations(); - } - }, /* timeoutMillis = */500000); + // There might be pending requests in the cache, but the above operation should not generate any + // errors. assertEquals(0, session.countPendingErrors()); // Confirm that we can still make progress. - session.apply(createInsert(kNumOps)).join(DEFAULT_SLEEP); + Insert insert = createInsert(kNumOps); + try { + session.apply(insert); + } catch (PleaseThrottleException ex) { + ex.getDeferred().join(DEFAULT_SLEEP); + session.apply(insert); + } for (OperationResponse resp: session.flush().join(DEFAULT_SLEEP)) { assertFalse(resp.hasRowError());