HIVE-20723: Allow per table specification of compaction yarn queue (Saurabh Seth via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35278429 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35278429 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35278429 Branch: refs/heads/master Commit: 35278429d9677b0878a4523ed7b03a5016f81e1d Parents: 3c6a36b Author: Saurabh Seth <saurabh.s...@gmail.com> Authored: Sat Oct 13 16:46:16 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Sat Oct 13 16:46:16 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/txn/compactor/TestCompactor.java | 10 ++++++++-- .../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 11 ++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index cffa21a..a9d7468 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1584,6 +1584,7 @@ public class TestCompactor { */ @Test public void testTableProperties() throws Exception { + conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, "root.user1"); String tblName1 = "ttp1"; // plain acid table String tblName2 = "ttp2"; // acid table with customized tblproperties executeStatementOnDriver("drop table if exists " + tblName1, driver); @@ -1596,7 +1597,8 @@ public class TestCompactor { "'transactional'='true'," + "'compactor.mapreduce.map.memory.mb'='2048'," + // 2048 MB memory for compaction map job "'compactorthreshold.hive.compactor.delta.num.threshold'='4'," + // minor compaction if more than 4 delta dirs - "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'" + // major compaction if more than 47% + "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.47'," + // major compaction if more than 47% + "'compactor.hive.compactor.job.queue'='root.user2'" + // Override the system wide compactor queue for this table ")", driver); // Insert 5 rows to both tables @@ -1641,6 +1643,7 @@ public class TestCompactor { t.run(); JobConf job = t.getMrJob(); Assert.assertEquals(2048, job.getMemoryForMapTask()); // 2048 comes from tblproperties + Assert.assertEquals("root.user2", job.getQueueName()); // Queue name comes from table properties // Compact ttp1 stop = new AtomicBoolean(true); t = new Worker(); @@ -1651,6 +1654,7 @@ public class TestCompactor { t.run(); job = t.getMrJob(); Assert.assertEquals(1024, job.getMemoryForMapTask()); // 1024 is the default value + Assert.assertEquals("root.user1", job.getQueueName()); // The system wide compaction queue name // Clean up runCleaner(conf); rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -1702,7 +1706,8 @@ public class TestCompactor { executeStatementOnDriver("alter table " + tblName2 + " compact 'major'" + " with overwrite tblproperties (" + "'compactor.mapreduce.map.memory.mb'='3072'," + - "'tblprops.orc.compress.size'='3141')", driver); + "'tblprops.orc.compress.size'='3141'," + + "'compactor.hive.compactor.job.queue'='root.user2')", driver); rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(4, rsp.getCompacts().size()); @@ -1722,6 +1727,7 @@ public class TestCompactor { job = t.getMrJob(); Assert.assertEquals(3072, job.getMemoryForMapTask()); Assert.assertTrue(job.get("hive.compactor.table.props").contains("orc.compress.size4:3141")); + Assert.assertEquals("root.user2", job.getQueueName()); /*createReader(FileSystem fs, Path path) throws IOException { */ //we just ran Major compaction so we should have a base_x in tblName2 that has the new files http://git-wip-us.apache.org/repos/asf/hive/blob/35278429/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 611f85a..92c74e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -150,11 +150,6 @@ public class CompactorMR { job.setOutputFormat(NullOutputFormat.class); job.setOutputCommitter(CompactorOutputCommitter.class); - String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE); - if(queueName != null && queueName.length() > 0) { - job.setQueueName(queueName); - } - job.set(FINAL_LOCATION, sd.getLocation()); job.set(TMP_LOCATION, generateTmpPath(sd)); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); @@ -167,6 +162,12 @@ public class CompactorMR { if (ci.properties != null) { overrideTblProps(job, t.getParameters(), ci.properties); } + + String queueName = HiveConf.getVar(job, ConfVars.COMPACTOR_JOB_QUEUE); + if (queueName != null && queueName.length() > 0) { + job.setQueueName(queueName); + } + setColumnTypes(job, sd.getCols()); //with feature on, multiple tasks may get into conflict creating/using TMP_LOCATION and if we were //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter