This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 32c9a71 HIVE-25595: Custom queue settings is not honoured by compaction StatsUpdater (#2702) (Laszlo Pinter, reviewed by Denys Kuzmenko) 32c9a71 is described below commit 32c9a71ca3481688071fc1ba1db8685adcb2a6fd Author: László Pintér <47777102+lcspin...@users.noreply.github.com> AuthorDate: Mon Oct 11 14:22:33 2021 +0200 HIVE-25595: Custom queue settings is not honoured by compaction StatsUpdater (#2702) (Laszlo Pinter, reviewed by Denys Kuzmenko) --- .../hive/ql/txn/compactor/CompactorTestUtil.java | 26 ++++- .../hive/ql/txn/compactor/TestCompactor.java | 6 +- .../ql/txn/compactor/TestMRCompactorOnTez.java | 108 +++++++++++++++++++++ .../hive/ql/txn/compactor/CompactorUtil.java | 37 +++++++ .../hadoop/hive/ql/txn/compactor/Worker.java | 41 +++----- 5 files changed, 184 insertions(+), 34 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java index 05bbb74..99d17a2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.Set; @@ -139,11 +140,12 @@ class CompactorTestUtil { * @param tblName table name * @param compactionType major/minor * @param isQueryBased true, if query based compaction should be run + * @param properties compaction request properties * @param partNames partition names * @throws Exception compaction cannot be started. */ static void runCompaction(HiveConf conf, String dbName, String tblName, CompactionType compactionType, - boolean isQueryBased, String... partNames) throws Exception { + boolean isQueryBased, Map<String, String> properties, String... partNames) throws Exception { HiveConf hiveConf = new HiveConf(conf); hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, isQueryBased); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); @@ -151,12 +153,15 @@ class CompactorTestUtil { t.setThreadId((int) t.getId()); t.setConf(hiveConf); t.init(new AtomicBoolean(true)); + CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType); + if (properties != null) { + cr.setProperties(properties); + } if (partNames.length == 0) { - txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); + txnHandler.compact(cr); t.run(); } else { for (String partName : partNames) { - CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType); cr.setPartitionname(partName); txnHandler.compact(cr); t.run(); @@ -165,6 +170,21 @@ class CompactorTestUtil { } /** + * Trigger a compaction run. + * @param conf hive configuration + * @param dbName database name + * @param tblName table name + * @param compactionType major/minor + * @param isQueryBased true, if query based compaction should be run + * @param partNames partition names + * @throws Exception compaction cannot be started. + */ + static void runCompaction(HiveConf conf, String dbName, String tblName, CompactionType compactionType, + boolean isQueryBased, String... partNames) throws Exception { + runCompaction(conf, dbName, tblName, compactionType, isQueryBased, null, partNames); + } + + /** * Trigger a compaction cleaner. * @param hConf hive configuration * @throws Exception if cleaner cannot be started. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index c5056c5..a5b550e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -380,15 +380,17 @@ public class TestCompactor { TxnStore txnHandler = TxnUtils.getTxnStore(conf); CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR); + Table table = msClient.getTable("default", tblName); LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci)); Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf, - System.getProperty("user.name")); + System.getProperty("user.name"), CompactorUtil.getCompactorJobQueueName(conf, ci, table)); su.gatherStats();//compute stats before compaction LOG.debug("List of stats columns after analyze Part1: " + txnHandler.findColumnsWithStats(ci)); CompactionInfo ciPart2 = new CompactionInfo("default", tblName, "bkt=1", CompactionType.MAJOR); LOG.debug("List of stats columns before analyze Part2: " + txnHandler.findColumnsWithStats(ci)); - su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name")); + su = Worker.StatsUpdater.init(ciPart2, colNames, conf, System.getProperty("user.name"), + CompactorUtil.getCompactorJobQueueName(conf, ciPart2, table)); su.gatherStats();//compute stats before compaction LOG.debug("List of stats columns after analyze Part2: " + txnHandler.findColumnsWithStats(ci)); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java new file mode 100644 index 0000000..36154f4 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMRCompactorOnTez.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; +import static org.junit.Assert.assertEquals; + +public class TestMRCompactorOnTez extends CompactorOnTezTest { + + @Test + public void testCompactorGatherStats() throws Exception{ + conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true); + conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, CUSTOM_COMPACTION_QUEUE); + conf.setBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS, true); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + String tmpFolder = folder.newFolder().getAbsolutePath(); + conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder); + + String dbName = "default"; + String tableName = "stats_comp_test"; + List<String> colNames = Arrays.asList("a"); + + executeStatementOnDriver("drop table if exists " + dbName + "." + tableName, driver); + executeStatementOnDriver("create table " + dbName + "." + tableName + + " (a INT) STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(1)", driver); + + // Make sure we do not have statistics for this table yet + // Compaction generates stats only if there is any + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + executeStatementOnDriver("analyze table " + dbName + "." + tableName + " compute statistics for columns", driver); + executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(2)", driver); + + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName()); + // Run major compaction and cleaner + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false); + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY); + + CompactorTestUtil.runCleaner(conf); + verifySuccessfulCompaction(1); + + List<ColumnStatisticsObj> colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE); + assertEquals("Stats should be there", 1, colStats.size()); + assertEquals("Value should contain new data", 2, colStats.get(0).getStatsData().getLongStats().getHighValue()); + assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue()); + + executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(3)", driver); + executeStatementOnDriver("alter table " + dbName + "." + tableName + " set tblproperties('compactor.mapred.job.queue.name'='" + + CUSTOM_COMPACTION_QUEUE + "')", driver); + + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName()); + // Run major compaction and cleaner + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false); + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY); + + CompactorTestUtil.runCleaner(conf); + verifySuccessfulCompaction(2); + + colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE); + assertEquals("Stats should be there", 1, colStats.size()); + assertEquals("Value should contain new data", 3, colStats.get(0).getStatsData().getLongStats().getHighValue()); + assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue()); + + executeStatementOnDriver("insert into " + dbName + "." + tableName + " values(4)", driver); + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, HiveProtoLoggingHook.class.getName()); + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, false, + Collections.singletonMap("compactor.mapred.job.queue.name", CUSTOM_COMPACTION_QUEUE)); + conf.setVar(HiveConf.ConfVars.PREEXECHOOKS, StringUtils.EMPTY); + + CompactorTestUtil.runCleaner(conf); + verifySuccessfulCompaction(3); + + colStats = msClient.getTableColumnStatistics(dbName, tableName, colNames, Constants.HIVE_ENGINE); + assertEquals("Stats should be there", 1, colStats.size()); + assertEquals("Value should contain new data", 4, colStats.get(0).getStatsData().getLongStats().getHighValue()); + assertEquals("Value should contain new data", 1, colStats.get(0).getStatsData().getLongStats().getLowValue()); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index 28fc642..a196e2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,6 +29,8 @@ import java.util.concurrent.ThreadFactory; public class CompactorUtil { public static final String COMPACTOR = "compactor"; + static final String COMPACTOR_PREFIX = "compactor."; + static final String MAPRED_QUEUE_NAME = "mapred.job.queue.name"; public interface ThrowingRunnable<E extends Exception> { void run() throws E; @@ -52,4 +57,36 @@ public class CompactorUtil { public static ExecutorService createExecutorWithThreadFactory(int threadCount, String threadNameFormat) { return Executors.newFixedThreadPool(threadCount, createThreadFactory(threadNameFormat)); } + + /** + * Get the compactor queue name if it's defined. + * @param conf global hive conf + * @param ci compaction info object + * @param table instance of table + * @return name of the queue, can be null + */ + static String getCompactorJobQueueName(HiveConf conf, CompactionInfo ci, Table table) { + // Get queue name from the ci. This is passed through + // ALTER TABLE table_name COMPACT 'major' WITH OVERWRITE TBLPROPERTIES('compactor.hive.compactor.job.queue'='some_queue') + if (ci.properties != null) { + StringableMap ciProperties = new StringableMap(ci.properties); + String queueName = ciProperties.get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME); + if (queueName != null && queueName.length() > 0) { + return queueName; + } + } + + // Get queue name from the table properties + String queueName = table.getParameters().get(COMPACTOR_PREFIX + MAPRED_QUEUE_NAME); + if (queueName != null && queueName.length() > 0) { + return queueName; + } + + // Get queue name from global hive conf + queueName = conf.get(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE.varname); + if (queueName != null && queueName.length() > 0) { + return queueName; + } + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index c7cdc95..1b8a13f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -42,11 +42,12 @@ import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.txn.TxnStatus; +import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +56,6 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -150,8 +149,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class); public static StatsUpdater init(CompactionInfo ci, List<String> columnListForStats, - HiveConf conf, String userName) { - return new StatsUpdater(ci, columnListForStats, conf, userName); + HiveConf conf, String userName, String compactionQueueName) { + return new StatsUpdater(ci, columnListForStats, conf, userName, compactionQueueName); } /** @@ -162,14 +161,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { private final HiveConf conf; private final String userName; private final CompactionInfo ci; + private final String compactionQueueName; private StatsUpdater(CompactionInfo ci, List<String> columnListForStats, - HiveConf conf, String userName) { + HiveConf conf, String userName, String compactionQueueName) { this.conf = new HiveConf(conf); //so that Driver doesn't think it's arleady in a transaction this.conf.unset(ValidTxnList.VALID_TXNS_KEY); this.userName = userName; this.ci = ci; + this.compactionQueueName = compactionQueueName; if (!ci.isMajorCompaction() || columnListForStats == null || columnListForStats.isEmpty()) { columnList = Collections.emptyList(); return; @@ -215,29 +216,11 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { sb.setLength(sb.length() - 1); //remove trailing , LOG.info(ci + ": running '" + sb.toString() + "'"); conf.setVar(HiveConf.ConfVars.METASTOREURIS,""); - - //todo: use DriverUtils.runOnDriver() here - QueryState queryState = new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build(); - SessionState localSession = null; - try (Driver d = new Driver(queryState)) { - if (SessionState.get() == null) { - localSession = new SessionState(conf); - SessionState.start(localSession); - } - try { - d.run(sb.toString()); - } catch (CommandProcessorException e) { - LOG.warn(ci + ": " + sb.toString() + " failed due to: " + e); - } - } finally { - if (localSession != null) { - try { - localSession.close(); - } catch (IOException ex) { - LOG.warn(ci + ": localSession.close() failed due to: " + ex.getMessage(), ex); - } - } + if (compactionQueueName != null && compactionQueueName.length() > 0) { + conf.set(TezConfiguration.TEZ_QUEUE_NAME, compactionQueueName); } + SessionState sessionState = DriverUtils.setUpSessionState(conf, userName, true); + DriverUtils.runOnDriver(conf, userName, sessionState, sb.toString()); } catch (Throwable t) { LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName + ") failed due to: " + t.getMessage(), t); @@ -515,7 +498,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { compactionTxn + " with compute stats set to " + computeStats); final StatsUpdater su = computeStats ? StatsUpdater.init(ci, msc.findColumnsWithStats( CompactionInfo.compactionInfoToStruct(ci)), conf, - runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner()) : null; + runJobAsSelf(ci.runAs) ? ci.runAs : t.getOwner(), CompactorUtil.getCompactorJobQueueName(conf, ci, t)) : null; try { failCompactionIfSetForTest();