This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 71c9af7 HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora) 71c9af7 is described below commit 71c9af7b8ead470520a6c3a4848be9c67eb80f10 Author: Karen Coppage <karenlcopp...@gmail.com> AuthorDate: Tue Aug 11 10:38:46 2020 +0200 HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora) Closes #1368 --- .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 13 ++++++++++++- .../apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java | 2 ++ .../hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java | 2 +- .../hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java | 2 +- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 8c9d53f..5cfa759 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -30,6 +30,7 @@ import java.util.Set; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -93,6 +94,8 @@ public class MapRecordProcessor extends RecordProcessor { private final List<String> cacheKeys = new ArrayList<>(); private final List<String> dynamicValueCacheKeys = new ArrayList<>(); private final ObjectCache cache, dynamicValueCache; + // is this part of the query-based compaction process + private final boolean isInCompaction; public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -104,6 +107,8 @@ public class MapRecordProcessor extends RecordProcessor { dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, true); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); + isInCompaction = CompactorUtil.COMPACTOR.equalsIgnoreCase( + HiveConf.getVar(jconf, HiveConf.ConfVars.SPLIT_GROUPING_MODE)); } private void setLlapOfFragmentId(final ProcessorContext context) { @@ -126,7 +131,13 @@ public class MapRecordProcessor extends RecordProcessor { // create map and fetch operators - mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf)); + if (!isInCompaction) { + mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf)); + } else { + // During query-based compaction, we don't want to retrieve old MapWork from the cache, we want a new mapper + // and new UDF validate_acid_sort_order instance for each bucket, otherwise validate_acid_sort_order will fail. + mapWork = Utilities.getMapWork(jconf); + } // TODO HIVE-14042. Cleanup may be required if exiting early. Utilities.setMapWork(jconf, mapWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index 85c1bf6..28fc642 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -25,6 +25,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class CompactorUtil { + public static final String COMPACTOR = "compactor"; + public interface ThrowingRunnable<E extends Exception> { void run() throws E; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 3df8ad7..ec81bfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -49,7 +49,7 @@ final class MajorQueryCompactor extends QueryCompactor { * For now, we will group splits on tez so that we end up with all bucket files, * with same bucket number in one map task. */ - conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR); String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 810f150..79e5595 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -53,7 +53,7 @@ final class MinorQueryCompactor extends QueryCompactor { // Set up the session for driver. HiveConf conf = new HiveConf(hiveConf); conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); - conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR); conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false); conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false); String tmpTableName =