This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 0933f1b DRILL-7651: Increase timeout for TestLargeFileCompilation to avoid GitHub Action failures and fix concurrent issue in TestTpchDistributedConcurrent 0933f1b is described below commit 0933f1bd6f496fe02f28e719c48839430add5977 Author: Volodymyr Vysotskyi <vvo...@gmail.com> AuthorDate: Thu Mar 19 18:43:02 2020 +0200 DRILL-7651: Increase timeout for TestLargeFileCompilation to avoid GitHub Action failures and fix concurrent issue in TestTpchDistributedConcurrent --- .../drill/TestTpchDistributedConcurrent.java | 159 ++++++++++---------- .../exec/compile/TestLargeFileCompilation.java | 163 ++++++++++++--------- 2 files changed, 181 insertions(+), 141 deletions(-) diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java index ba3bd44..c144c53 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -26,18 +26,25 @@ import java.util.concurrent.Semaphore; import org.apache.drill.categories.SlowTest; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.BaseTestQuery.SilentListener; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; import org.apache.drill.test.TestTools; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.rpc.user.UserResultsListener; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.QueryTestUtil; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -49,17 +56,18 @@ import static org.junit.Assert.assertNull; * any particular order of execution. We ignore the results. */ @Category({SlowTest.class}) -public class TestTpchDistributedConcurrent extends BaseTestQuery { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributedConcurrent.class); +public class TestTpchDistributedConcurrent extends ClusterTest { + private static final Logger logger = LoggerFactory.getLogger(TestTpchDistributedConcurrent.class); - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(360000); // Longer timeout than usual. + @Rule + public final TestRule TIMEOUT = TestTools.getTimeoutRule(400_000); // 400 secs /* * Valid test names taken from TestTpchDistributed. Fuller path prefixes are * used so that tests may also be taken from other locations -- more variety * is better as far as this test goes. */ - private final static String queryFile[] = { + private static final String[] queryFile = { "queries/tpch/01.sql", "queries/tpch/03.sql", "queries/tpch/04.sql", @@ -80,45 +88,92 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { "queries/tpch/20.sql", }; - private final static int TOTAL_QUERIES = 115; - private final static int CONCURRENT_QUERIES = 15; - - private final static Random random = new Random(0xdeadbeef); - private final static String alterSession = "alter session set `planner.slice_target` = 10"; + private static final int TOTAL_QUERIES = 115; + private static final int CONCURRENT_QUERIES = 15; + private static final Random random = new Random(0xdeadbeef); private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES; private final Semaphore completionSemaphore = new Semaphore(0); private final Semaphore submissionSemaphore = new Semaphore(0); private final Set<UserResultsListener> listeners = Sets.newIdentityHashSet(); + private final List<FailedQuery> failedQueries = new LinkedList<>(); private Thread testThread = null; // used to interrupt semaphore wait in case of error - private static class FailedQuery { - final String queryFile; - final UserException userEx; + @BeforeClass + public static void setUp() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000); + startCluster(builder); + } - public FailedQuery(final String queryFile, final UserException userEx) { - this.queryFile = queryFile; - this.userEx = userEx; + @Test + public void testConcurrentQueries() { + client.alterSession(ExecConstants.SLICE_TARGET, 10); + + testThread = Thread.currentThread(); + final QuerySubmitter querySubmitter = new QuerySubmitter(); + querySubmitter.start(); + + // Kick off the initial queries. As they complete, they will submit more. + submissionSemaphore.release(CONCURRENT_QUERIES); + + // Wait for all the queries to complete. + InterruptedException interruptedException = null; + try { + completionSemaphore.acquire(TOTAL_QUERIES); + } catch (InterruptedException e) { + interruptedException = e; + + // List the failed queries. + for (FailedQuery fq : failedQueries) { + logger.error("{} failed with {}", fq.queryFile, fq.userEx); + } } - } - private final List<FailedQuery> failedQueries = new LinkedList<>(); + // Stop the querySubmitter thread. + querySubmitter.interrupt(); + + if (interruptedException != null) { + logger.error("Interruped Exception ", interruptedException); + } + + assertNull("Query error caused interruption", interruptedException); + + int nListeners = listeners.size(); + assertEquals(nListeners + " listeners still exist", 0, nListeners); + + assertEquals("Didn't submit all queries", 0, remainingQueries); + assertEquals("Queries failed", 0, failedQueries.size()); + } private void submitRandomQuery() { - final String filename = queryFile[random.nextInt(queryFile.length)]; - final String query; + String filename = queryFile[random.nextInt(queryFile.length)]; + String query; try { query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' '); - } catch(IOException e) { + } catch (IOException e) { throw new RuntimeException("Caught exception", e); } - final UserResultsListener listener = new ChainingSilentListener(query); - client.runQuery(UserBitShared.QueryType.SQL, query, listener); - synchronized(this) { + UserResultsListener listener = new ChainingSilentListener(query); + queryBuilder() + .query(UserBitShared.QueryType.SQL, query) + .withListener(listener); + synchronized (this) { listeners.add(listener); } } + private static class FailedQuery { + final String queryFile; + final UserException userEx; + + public FailedQuery(String queryFile, UserException userEx) { + this.queryFile = queryFile; + this.userEx = userEx; + } + } + + private class ChainingSilentListener extends SilentListener { private final String query; @@ -130,8 +185,7 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { public void queryCompleted(QueryState state) { super.queryCompleted(state); - completionSemaphore.release(); - synchronized(TestTpchDistributedConcurrent.this) { + synchronized (TestTpchDistributedConcurrent.this) { final Object object = listeners.remove(this); assertNotNull("listener not found", object); @@ -146,6 +200,7 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { --remainingQueries; } } + completionSemaphore.release(); } @Override @@ -154,7 +209,7 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { completionSemaphore.release(); logger.error("submissionFailed for {} \nwith:", query, uex); - synchronized(TestTpchDistributedConcurrent.this) { + synchronized (TestTpchDistributedConcurrent.this) { final Object object = listeners.remove(this); assertNotNull("listener not found", object); failedQueries.add(new FailedQuery(query, uex)); @@ -166,10 +221,10 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { private class QuerySubmitter extends Thread { @Override public void run() { - while(true) { + while (true) { try { submissionSemaphore.acquire(); - } catch(InterruptedException e) { + } catch (InterruptedException e) { logger.error("QuerySubmitter quitting."); return; } @@ -178,50 +233,4 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery { } } } - - @Test - public void testConcurrentQueries() throws Exception { - QueryTestUtil.testRunAndLog(client, UserBitShared.QueryType.SQL, alterSession); - - testThread = Thread.currentThread(); - final QuerySubmitter querySubmitter = new QuerySubmitter(); - querySubmitter.start(); - - // Kick off the initial queries. As they complete, they will submit more. - submissionSemaphore.release(CONCURRENT_QUERIES); - - // Wait for all the queries to complete. - InterruptedException interruptedException = null; - try { - completionSemaphore.acquire(TOTAL_QUERIES); - } catch(InterruptedException e) { - interruptedException = e; - - // List the failed queries. - for(final FailedQuery fq : failedQueries) { - logger.error(String.format("%s failed with %s", fq.queryFile, fq.userEx)); - } - } - - // Stop the querySubmitter thread. - querySubmitter.interrupt(); - - if (interruptedException != null) { - final StackTraceElement[] ste = interruptedException.getStackTrace(); - final StringBuilder sb = new StringBuilder(); - for(StackTraceElement s : ste) { - sb.append(s.toString()); - sb.append('\n'); - } - logger.error("Interruped Exception ", interruptedException); - } - - assertNull("Query error caused interruption", interruptedException); - - final int nListeners = listeners.size(); - assertEquals(nListeners + " listeners still exist", 0, nListeners); - - assertEquals("Didn't submit all queries", 0, remainingQueries); - assertEquals("Queries failed", 0, failedQueries.size()); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java index 35e58c1..7bd08ec 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java @@ -21,8 +21,14 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.test.BaseTestQuery; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; import org.apache.drill.test.TestTools; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -30,8 +36,8 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; @Category({SlowTest.class}) -public class TestLargeFileCompilation extends BaseTestQuery { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200000); // 200 secs +public class TestLargeFileCompilation extends ClusterTest { + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(200_000); // 200 secs private static final String LARGE_QUERY_GROUP_BY; @@ -156,77 +162,107 @@ public class TestLargeFileCompilation extends BaseTestQuery { return sb.append("full_name\nfrom cp.`employee.json` limit 1)").toString(); } - @Ignore // TODO DRILL-5997 + @BeforeClass + public static void setUp() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.USER_RPC_TIMEOUT, 5_000); + startCluster(builder); + } + + @Before + public void setJDK() { + client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK"); + } + + @After + public void resetJDK() { + client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION); + } + @Test public void testTEXT_WRITER() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("use dfs.tmp"); - testNoResult("alter session set `%s`='csv'", ExecConstants.OUTPUT_FORMAT_OPTION); - testNoResult(LARGE_QUERY_WRITER, "wide_table_csv"); + try { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + + run("use dfs.tmp"); + run(LARGE_QUERY_WRITER, "wide_table_csv"); + } finally { + client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION); + } } @Test public void testPARQUET_WRITER() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("use dfs.tmp"); - testNoResult("alter session set `%s`='parquet'", ExecConstants.OUTPUT_FORMAT_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_WRITER, "wide_table_parquet"); + try { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "parquet"); + run("use dfs.tmp"); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_WRITER, "wide_table_parquet"); + } + } finally { + client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION); + } } - @Ignore // TODO DRILL-5997 @Test public void testGROUP_BY() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_GROUP_BY); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_GROUP_BY); + } } @Test public void testEXTERNAL_SORT() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_ORDER_BY); + } } @Test public void testTOP_N_SORT() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_ORDER_BY_WITH_LIMIT); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_ORDER_BY_WITH_LIMIT); + } } @Ignore // TODO DRILL-5997 @Test public void testFILTER() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_FILTER); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_FILTER); + } } @Test public void testClassTransformationOOM() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, QUERY_FILTER); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(QUERY_FILTER); + } } @Test public void testProject() throws Exception { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult(ITERATION_COUNT, LARGE_QUERY_SELECT_LIST); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(LARGE_QUERY_SELECT_LIST); + } } @Test public void testHashJoin() throws Exception { String tableName = "wide_table_hash_join"; try { - setSessionOption("drill.exec.hashjoin.fallback.enabled", true); - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("alter session set `planner.enable_mergejoin` = false"); - testNoResult("alter session set `planner.enable_nestedloopjoin` = false"); - testNoResult("use dfs.tmp"); - testNoResult(LARGE_TABLE_WRITER, tableName); - testNoResult(QUERY_WITH_JOIN, tableName); + client.alterSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY, true); + client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false); + client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false); + + run("use dfs.tmp"); + run(LARGE_TABLE_WRITER, tableName); + run(QUERY_WITH_JOIN, tableName); } finally { - resetSessionOption("planner.enable_mergejoin"); - resetSessionOption("planner.enable_nestedloopjoin"); - resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("drop table if exists %s", tableName); + client.resetSession(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY); + client.resetSession(PlannerSettings.MERGEJOIN.getOptionName()); + client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName()); + run("drop table if exists %s", tableName); } } @@ -234,17 +270,15 @@ public class TestLargeFileCompilation extends BaseTestQuery { public void testMergeJoin() throws Exception { String tableName = "wide_table_merge_join"; try { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("alter session set `planner.enable_hashjoin` = false"); - testNoResult("alter session set `planner.enable_nestedloopjoin` = false"); - testNoResult("use dfs.tmp"); - testNoResult(LARGE_TABLE_WRITER, tableName); - testNoResult(QUERY_WITH_JOIN, tableName); + client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false); + client.alterSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName(), false); + run("use dfs.tmp"); + run(LARGE_TABLE_WRITER, tableName); + run(QUERY_WITH_JOIN, tableName); } finally { - resetSessionOption("planner.enable_hashjoin"); - resetSessionOption("planner.enable_nestedloopjoin"); - resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("drop table if exists %s", tableName); + client.resetSession(PlannerSettings.HASHJOIN.getOptionName()); + client.resetSession(PlannerSettings.NESTEDLOOPJOIN.getOptionName()); + run("drop table if exists %s", tableName); } } @@ -252,39 +286,36 @@ public class TestLargeFileCompilation extends BaseTestQuery { public void testNestedLoopJoin() throws Exception { String tableName = "wide_table_loop_join"; try { - testNoResult("alter session set `%s`='JDK'", ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("alter session set `planner.enable_nljoin_for_scalar_only` = false"); - testNoResult("alter session set `planner.enable_hashjoin` = false"); - testNoResult("alter session set `planner.enable_mergejoin` = false"); - testNoResult("use dfs.tmp"); - testNoResult(LARGE_TABLE_WRITER, tableName); - testNoResult(QUERY_WITH_JOIN, tableName); + client.alterSession(PlannerSettings.HASHJOIN.getOptionName(), false); + client.alterSession(PlannerSettings.MERGEJOIN.getOptionName(), false); + client.alterSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName(), false); + run("use dfs.tmp"); + run(LARGE_TABLE_WRITER, tableName); + run(QUERY_WITH_JOIN, tableName); } finally { - resetSessionOption("planner.enable_nljoin_for_scalar_only"); - resetSessionOption("planner.enable_hashjoin"); - resetSessionOption("planner.enable_mergejoin"); - resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION); - testNoResult("drop table if exists %s", tableName); + client.resetSession(PlannerSettings.HASHJOIN.getOptionName()); + client.resetSession(PlannerSettings.MERGEJOIN.getOptionName()); + client.resetSession(PlannerSettings.NLJOIN_FOR_SCALAR.getOptionName()); + run("drop table if exists %s", tableName); } } @Test public void testJDKHugeStringConstantCompilation() throws Exception { - try { - setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK"); - testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY); - } finally { - resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(HUGE_STRING_CONST_QUERY); } } @Test public void testJaninoHugeStringConstantCompilation() throws Exception { try { - setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO"); - testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY); + client.alterSession(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO"); + for (int i = 0; i < ITERATION_COUNT; i++) { + run(HUGE_STRING_CONST_QUERY); + } } finally { - resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION); + client.resetSession(ClassCompilerSelector.JAVA_COMPILER_OPTION); } } }