Repository: hive Updated Branches: refs/heads/branch-3 c6ebe0f74 -> 06d8f362e
HIVE-19864: Address TestTriggersWorkloadManager flakiness (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/06d8f362 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/06d8f362 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/06d8f362 Branch: refs/heads/branch-3 Commit: 06d8f362edd46d573c79df3eb1ca77cba285ae6e Parents: c6ebe0f Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Thu Jun 14 23:00:56 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Thu Jun 14 23:01:14 2018 -0700 ---------------------------------------------------------------------- .../hive/jdbc/AbstractJdbcTriggersTest.java | 85 +++++++---- .../jdbc/TestTriggersMoveWorkloadManager.java | 9 ++ .../hive/jdbc/TestTriggersNoTezSessionPool.java | 31 ++-- .../jdbc/TestTriggersTezSessionPoolManager.java | 150 +++++++++++++------ .../hive/jdbc/TestTriggersWorkloadManager.java | 9 ++ 5 files changed, 201 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/06d8f362/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 7d5172b..aa9893e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -47,8 +47,11 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractJdbcTriggersTest { + private final Logger LOG = LoggerFactory.getLogger(getClass().getName()); protected static MiniHS2 miniHS2 = null; protected static String dataFileDir; static Path kvDataFilePath; @@ -100,12 +103,14 @@ public abstract class AbstractJdbcTriggersTest { } @AfterClass - public static void afterTest() throws Exception { + public static void afterTest() { if (miniHS2.isStarted()) { miniHS2.stop(); } } + public abstract String getTestName(); + private void createSleepUDF() throws SQLException { String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); Connection con = hs2Conn; @@ -115,8 +120,40 @@ public abstract class AbstractJdbcTriggersTest { } void runQueryWithTrigger(final String query, final List<String> setCmds, - final String expect) throws Exception { - runQueryWithTrigger(query, setCmds, expect, null); + final String expect, final int queryTimeoutSecs) throws Exception { + String testName = getTestName(); + long start = System.currentTimeMillis(); + LOG.info("Start of test: {}", testName); + Connection con = hs2Conn; + BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + createSleepUDF(); + final Statement selStmt = con.createStatement(); + Throwable throwable = null; + try { + if (queryTimeoutSecs > 0) { + selStmt.setQueryTimeout(queryTimeoutSecs); + } + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + selStmt.execute(query); + } catch (SQLException e) { + throwable = e; + } + selStmt.close(); + + if (expect == null) { + assertNull("Expected query to succeed", throwable); + } else { + assertNotNull("Expected non-null throwable", throwable); + assertEquals(SQLException.class, throwable.getClass()); + assertTrue(expect + " is not contained in " + throwable.getMessage(), + throwable.getMessage().contains(expect)); + } + long end = System.currentTimeMillis(); + LOG.info("End of test: {} time: {} ms", testName, (end - start)); } void runQueryWithTrigger(final String query, final List<String> setCmds, @@ -130,32 +167,27 @@ public abstract class AbstractJdbcTriggersTest { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); System.setErr(new PrintStream(baos)); // capture stderr final Statement selStmt = con.createStatement(); - final Throwable[] throwable = new Throwable[1]; + Throwable throwable = null; try { - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); } - selStmt.execute(query); - } catch (SQLException e) { - throwable[0] = e; } - }); - queryThread.start(); - - queryThread.join(); + selStmt.execute(query); + } catch (SQLException e) { + throwable = e; + } selStmt.close(); if (expect == null) { - assertNull("Expected query to succeed", throwable[0]); + assertNull("Expected query to succeed", throwable); } else { - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); + assertNotNull("Expected non-null throwable", throwable); + assertEquals(SQLException.class, throwable.getClass()); + assertTrue(expect + " is not contained in " + throwable.getMessage(), + throwable.getMessage().contains(expect)); } if (errCaptureExpect != null && !errCaptureExpect.isEmpty()) { @@ -173,7 +205,6 @@ public abstract class AbstractJdbcTriggersTest { } finally { baos.close(); } - } abstract void setupTriggers(final List<Trigger> triggers) throws Exception; @@ -181,10 +212,10 @@ public abstract class AbstractJdbcTriggersTest { List<String> getConfigs(String... more) { List<String> setCmds = new ArrayList<>(); setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); + setCmds.add("set mapred.min.split.size=200"); + setCmds.add("set mapred.max.split.size=200"); + setCmds.add("set tez.grouping.min-size=200"); + setCmds.add("set tez.grouping.max-size=200"); if (more != null) { setCmds.addAll(Arrays.asList(more)); } http://git-wip-us.apache.org/repos/asf/hive/blob/06d8f362/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java index 5df5ede..ad5aa18 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java @@ -43,13 +43,22 @@ import org.apache.hive.common.util.RetryTestRunner; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.junit.runner.RunWith; import com.google.common.collect.Lists; @RunWith(RetryTestRunner.class) public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest { + @Rule + public TestName testName = new TestName(); + + @Override + public String getTestName() { + return getClass().getSimpleName() + "#" + testName.getMethodName(); + } @BeforeClass public static void beforeTest() throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/06d8f362/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java index 2117b68..aea4fe7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -16,22 +16,31 @@ package org.apache.hive.jdbc; -import org.apache.hadoop.hive.metastore.api.WMTrigger; - import java.util.List; + import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.Trigger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; + import com.google.common.collect.Lists; public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest { + @Rule + public TestName testName = new TestName(); + + @Override + public String getTestName() { + return getClass().getSimpleName() + "#" + testName.getMethodName(); + } @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { @@ -40,41 +49,41 @@ public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + runQueryWithTrigger(query, null, trigger + " violated", 50); } @Test(timeout = 60000) public void testTriggerVertexTotalTasks() throws Exception { - Expression expression = ExpressionFactory.fromString("VERTEX_TOTAL_TASKS > 50"); + Expression expression = ExpressionFactory.fromString("VERTEX_TOTAL_TASKS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated", 50); } @Test(timeout = 60000) public void testTriggerDAGTotalTasks() throws Exception { - Expression expression = ExpressionFactory.fromString("DAG_TOTAL_TASKS > 50"); + Expression expression = ExpressionFactory.fromString("DAG_TOTAL_TASKS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated", 50); } @Test(timeout = 60000) public void testTriggerTotalLaunchedTasks() throws Exception { - Expression expression = ExpressionFactory.fromString("TOTAL_LAUNCHED_TASKS > 50"); + Expression expression = ExpressionFactory.fromString("TOTAL_LAUNCHED_TASKS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated", 50); } @Override - void setupTriggers(final List<Trigger> triggers) throws Exception { + void setupTriggers(final List<Trigger> triggers) { WMFullResourcePlan rp = new WMFullResourcePlan( new WMResourcePlan("rp"), null); for (Trigger trigger : triggers) { http://git-wip-us.apache.org/repos/asf/hive/blob/06d8f362/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index de0f31e..afea596 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -16,23 +16,32 @@ package org.apache.hive.jdbc; -import org.apache.hadoop.hive.metastore.api.WMTrigger; - import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.Trigger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; + import com.google.common.collect.Lists; public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest { + @Rule + public TestName testName = new TestName(); + + @Override + public String getTestName() { + return getClass().getSimpleName() + "#" + testName.getMethodName(); + } @Test(timeout = 120000) public void testTriggerSlowQueryElapsedTime() throws Exception { @@ -41,7 +50,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -51,7 +63,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -61,7 +76,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -71,12 +89,14 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); List<String> cmds = new ArrayList<>(); cmds.add("set hive.auto.convert.join=false"); + cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); // to slow down the reducer so that SHUFFLE_BYTES publishing and validation can happen, adding sleep between // multiple reduce stages String query = "select count(distinct t.under_col), sleep(t.under_col, 10) from (select t1.under_col from " + tableName + " t1 " + "join " + tableName + " t2 on t1.under_col=t2.under_col order by sleep(t1.under_col, 0))" + " t group by t.under_col"; - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -86,7 +106,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -96,17 +119,23 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, trigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) public void testTriggerTotalTasks() throws Exception { - Expression expression = ExpressionFactory.fromString("VERTEX_TOTAL_TASKS > 50"); + Expression expression = ExpressionFactory.fromString("VERTEX_TOTAL_TASKS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -116,7 +145,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -126,18 +158,22 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, trigger + " violated", 110); } @Test(timeout = 120000) public void testTriggerCustomCreatedFiles() throws Exception { List<String> cmds = getConfigs(); - + cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "create table testtab2 as select * from " + tableName; - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); // partitioned insert expression = ExpressionFactory.fromString("CREATED_FILES > 10"); @@ -147,7 +183,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest cmds.add("create table src3 (key int) partitioned by (value string)"); query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + " where under_col < 100"; - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); } @Test(timeout = 240000) @@ -155,40 +191,41 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest List<String> cmds = getConfigs(); cmds.add("drop table src2"); cmds.add("create table src2 (key int) partitioned by (value string)"); - + cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); // query will get cancelled before creating 57 partitions String query = "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); cmds = getConfigs(); // let it create 57 partitions without any triggers query = "insert overwrite table src2 partition (value) select under_col, value from " + tableName + " where under_col < 100"; setupTriggers(Lists.newArrayList()); - runQueryWithTrigger(query, cmds, null); + runQueryWithTrigger(query, cmds, null, 110); // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); // let it create 64 more partitions (total 57 + 64 = 121) without any triggers query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; setupTriggers(Lists.newArrayList()); - runQueryWithTrigger(query, cmds, null); + runQueryWithTrigger(query, cmds, null, 110); // re-run insert into but this time no new partitions will be created, so there will be no violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, null); + runQueryWithTrigger(query, cmds, null, 110); } @Test(timeout = 120000) @@ -198,7 +235,8 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest cmds.add("drop table src3"); cmds.add("create table src2 (key int) partitioned by (value string)"); cmds.add("create table src3 (key int) partitioned by (value string)"); - + cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); String query = "from " + tableName + " insert overwrite table src2 partition (value) select * where under_col < 100 " + @@ -206,7 +244,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -214,7 +252,8 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest List<String> cmds = getConfigs(); cmds.add("drop table src2"); cmds.add("create table src2 (key int) partitioned by (value string)"); - + cmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + cmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); // query will get cancelled before creating 57 partitions String query = "insert overwrite table src2 partition (value) " + @@ -225,7 +264,7 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, trigger + " violated"); + runQueryWithTrigger(query, cmds, trigger + " violated", 110); } @Test(timeout = 120000) @@ -235,55 +274,70 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(trigger)); String query = "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; - runQueryWithTrigger(query, null, null); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, null, 110); } @Test(timeout = 120000) public void testTriggerDagRawInputSplitsKill() throws Exception { - // Map 1 - 55 splits - // Map 3 - 55 splits - Expression expression = ExpressionFactory.fromString("DAG_RAW_INPUT_SPLITS > 100"); + // Map 1 - 28 splits + // Map 3 - 28 splits + Expression expression = ExpressionFactory.fromString("DAG_RAW_INPUT_SPLITS > 50"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, "Query was cancelled", 110); } @Test(timeout = 120000) public void testTriggerVertexRawInputSplitsNoKill() throws Exception { - // Map 1 - 55 splits - // Map 3 - 55 splits - Expression expression = ExpressionFactory.fromString("VERTEX_RAW_INPUT_SPLITS > 100"); + // Map 1 - 28 splits + // Map 3 - 28 splits + Expression expression = ExpressionFactory.fromString("VERTEX_RAW_INPUT_SPLITS > 50"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), null); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, null, 110); } @Test(timeout = 120000) public void testTriggerVertexRawInputSplitsKill() throws Exception { - // Map 1 - 55 splits - // Map 3 - 55 splits - Expression expression = ExpressionFactory.fromString("VERTEX_RAW_INPUT_SPLITS > 50"); + // Map 1 - 28 splits + // Map 3 - 28 splits + Expression expression = ExpressionFactory.fromString("VERTEX_RAW_INPUT_SPLITS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, "Query was cancelled", 110); } @Test(timeout = 120000) public void testTriggerDefaultRawInputSplits() throws Exception { - // Map 1 - 55 splits - // Map 3 - 55 splits - Expression expression = ExpressionFactory.fromString("RAW_INPUT_SPLITS > 50"); + // Map 1 - 28 splits + // Map 3 - 28 splits + Expression expression = ExpressionFactory.fromString("RAW_INPUT_SPLITS > 20"); Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + List<String> setCmds = getConfigs(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, "Query was cancelled", 110); } @Test(timeout = 120000) @@ -295,7 +349,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, execTimeTrigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, execTimeTrigger + " violated", 110); } @Test(timeout = 120000) @@ -307,7 +364,10 @@ public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, shuffleTrigger + " violated"); + List<String> setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); + runQueryWithTrigger(query, setCmds, shuffleTrigger + " violated", 110); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/06d8f362/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 85391ac..53e7347 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -35,10 +35,19 @@ import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; import com.google.common.collect.Lists; public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager { + @Rule + public TestName testName = new TestName(); + + @Override + public String getTestName() { + return getClass().getSimpleName() + "#" + testName.getMethodName(); + } @BeforeClass public static void beforeTest() throws Exception {