This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 805565022872d74bc3053272b100f324ae8b561e Author: Ivan Suller <isul...@cloudera.com> AuthorDate: Tue Sep 17 14:59:31 2019 +0200 HIVE-22065: Code cleanup around org.apache.hadoop.hive.ql.exec.tez.RecordProcessor (Ivan Suller via Zoltan Haindrich) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> --- .../hive/ql/exec/tez/MapRecordProcessor.java | 93 ++++++---------- .../hive/ql/exec/tez/MergeFileRecordProcessor.java | 6 +- .../hadoop/hive/ql/exec/tez/RecordProcessor.java | 20 ++-- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 121 ++++++++------------- 4 files changed, 91 insertions(+), 149 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index ea2e1fd..8c9d53f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -27,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -79,21 +77,22 @@ import org.apache.tez.runtime.library.api.KeyValueReader; * Just pump the records through the query plan. */ public class MapRecordProcessor extends RecordProcessor { - public static final Logger l4j = LoggerFactory.getLogger(MapRecordProcessor.class); - protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; + private static final Logger LOG = LoggerFactory.getLogger(MapRecordProcessor.class); + private static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private AbstractMapOperator mapOp; - private final List<AbstractMapOperator> mergeMapOpList = new ArrayList<AbstractMapOperator>(); + private final List<AbstractMapOperator> mergeMapOpList = new ArrayList<>(); private MapRecordSource[] sources; - private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>(); + private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<>(); private int position; - MRInputLegacy legacyMRInput; - MultiMRInput mainWorkMultiMRInput; + private MRInputLegacy legacyMRInput; + private MultiMRInput mainWorkMultiMRInput; private final ExecMapperContext execContext; private MapWork mapWork; - List<MapWork> mergeWorkList; - List<String> cacheKeys, dynamicValueCacheKeys; - ObjectCache cache, dynamicValueCache; + private List<MapWork> mergeWorkList; + private final List<String> cacheKeys = new ArrayList<>(); + private final List<String> dynamicValueCacheKeys = new ArrayList<>(); + private final ObjectCache cache, dynamicValueCache; public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -105,16 +104,12 @@ public class MapRecordProcessor extends RecordProcessor { dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, true); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); - cacheKeys = new ArrayList<String>(); - dynamicValueCacheKeys = new ArrayList<String>(); } private void setLlapOfFragmentId(final ProcessorContext context) { // TODO: could we do this only if the OF is actually used? String attemptId = Converters.createTaskAttemptId(context).toString(); - if (l4j.isDebugEnabled()) { - l4j.debug("Setting the LLAP fragment ID for OF to " + attemptId); - } + LOG.debug("Setting the LLAP fragment ID for OF to {}", attemptId); jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, attemptId); } @@ -131,12 +126,7 @@ public class MapRecordProcessor extends RecordProcessor { // create map and fetch operators - mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() { - @Override - public Object call() { - return Utilities.getMapWork(jconf); - } - }); + mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf)); // TODO HIVE-14042. Cleanup may be required if exiting early. Utilities.setMapWork(jconf, mapWork); @@ -147,7 +137,7 @@ public class MapRecordProcessor extends RecordProcessor { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes != null) { - mergeWorkList = new ArrayList<MapWork>(); + mergeWorkList = new ArrayList<>(); for (final String prefix : prefixes.split(",")) { if (prefix == null || prefix.isEmpty()) { @@ -159,13 +149,7 @@ public class MapRecordProcessor extends RecordProcessor { checkAbortCondition(); mergeWorkList.add( - (MapWork) cache.retrieve(key, - new Callable<Object>() { - @Override - public Object call() { - return Utilities.getMergeWork(jconf, prefix); - } - })); + (MapWork) cache.retrieve(key, () -> Utilities.getMergeWork(jconf, prefix))); } } @@ -189,7 +173,7 @@ public class MapRecordProcessor extends RecordProcessor { createOutputMap(); // Start all the Outputs. for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { - l4j.debug("Starting Output: " + outputEntry.getKey()); + LOG.debug("Starting Output: " + outputEntry.getKey()); outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } @@ -233,25 +217,25 @@ public class MapRecordProcessor extends RecordProcessor { // initialize the merge operators first. if (mergeMapOp != null) { mergeMapOp.setConf(mergeMapWork); - l4j.info("Input name is " + mergeMapWork.getName()); + LOG.info("Input name is {}", mergeMapWork.getName()); jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); mergeMapOp.initialize(jconf, null); // if there are no files/partitions to read, we need to skip trying to read MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName()); boolean skipRead = false; if (multiMRInput == null) { - l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " is null. Skipping read."); + LOG.info("Multi MR Input for work {} is null. Skipping read.", mergeMapWork.getName()); skipRead = true; } else { Collection<KeyValueReader> keyValueReaders = multiMRInput.getKeyValueReaders(); if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) { - l4j.info("Key value readers are null or empty and hence skipping read. " - + "KeyValueReaders = " + keyValueReaders); + LOG.info("Key value readers are null or empty and hence skipping read. " + + "KeyValueReaders = {}", keyValueReaders); skipRead = true; } } if (skipRead) { - List<Operator<?>> children = new ArrayList<Operator<?>>(); + List<Operator<?>> children = new ArrayList<>(); children.addAll(mergeMapOp.getConf().getAliasToWork().values()); // do the same thing as setChildren when there is nothing to read. // the setChildren method initializes the object inspector needed by the operators @@ -286,19 +270,19 @@ public class MapRecordProcessor extends RecordProcessor { // initialize map operator mapOp.setConf(mapWork); - l4j.info("Main input name is " + mapWork.getName()); + LOG.info("Main input name is " + mapWork.getName()); jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.initialize(jconf, null); checkAbortCondition(); mapOp.setChildren(jconf); mapOp.passExecContext(execContext); - l4j.info(mapOp.dump(0)); + LOG.info(mapOp.dump(0)); // set memory available for operators long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); if (mapOp.getConf() != null) { mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask); - l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + LOG.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); } OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), memoryAvailableToTask); @@ -309,12 +293,7 @@ public class MapRecordProcessor extends RecordProcessor { String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; // On LLAP dynamic value registry might already be cached. final DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, - new Callable<DynamicValueRegistryTez>() { - @Override - public DynamicValueRegistryTez call() { - return new DynamicValueRegistryTez(); - } - }); + () -> new DynamicValueRegistryTez()); dynamicValueCacheKeys.add(valueRegistryKey); RegistryConfTez registryConf = new RegistryConfTez(jconf, mapWork, processorContext, inputs); registryTez.init(registryConf); @@ -322,7 +301,7 @@ public class MapRecordProcessor extends RecordProcessor { checkAbortCondition(); initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); - if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { + if ((mergeMapOpList != null) && !mergeMapOpList.isEmpty()) { for (AbstractMapOperator mergeMapOp : mergeMapOpList) { jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); // TODO HIVE-14042. abort handling: Handling of mergeMapOp @@ -354,7 +333,7 @@ public class MapRecordProcessor extends RecordProcessor { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", + LOG.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { @@ -383,7 +362,7 @@ public class MapRecordProcessor extends RecordProcessor { String inputName = mapOp.getConf().getName(); MultiMRInput multiMRInput = multiMRInputMap.get(inputName); Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders(); - l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); + LOG.debug("There are {} key-value readers for input {}", kvReaders.size(), inputName); if (kvReaders.size() > 0) { reader = getKeyValueReader(kvReaders, mapOp); sources[tag].init(jconf, mapOp, reader); @@ -392,10 +371,8 @@ public class MapRecordProcessor extends RecordProcessor { ((TezContext) MapredContext.get()).setRecordSources(sources); } - @SuppressWarnings("deprecation") private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders, - AbstractMapOperator mapOp) - throws Exception { + AbstractMapOperator mapOp) throws Exception { List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders); // this sets up the map operator contexts correctly mapOp.initializeContexts(); @@ -436,10 +413,10 @@ public class MapRecordProcessor extends RecordProcessor { // this will abort initializeOp() if (mapOp != null) { - l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName()); + LOG.info("Forwarding abort to mapOp: {} ", mapOp.getName()); mapOp.abort(); } else { - l4j.info("mapOp not setup yet. abort not being forwarded"); + LOG.info("mapOp not setup yet. abort not being forwarded"); } } @@ -450,13 +427,13 @@ public class MapRecordProcessor extends RecordProcessor { setAborted(execContext.getIoCxt().getIOExceptions()); } - if (cache != null && cacheKeys != null) { + if (cache != null) { for (String k: cacheKeys) { cache.release(k); } } - if (dynamicValueCache != null && dynamicValueCacheKeys != null) { + if (dynamicValueCache != null) { for (String k: dynamicValueCacheKeys) { dynamicValueCache.release(k); } @@ -491,7 +468,7 @@ public class MapRecordProcessor extends RecordProcessor { } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", e); } } finally { @@ -505,7 +482,7 @@ public class MapRecordProcessor extends RecordProcessor { MRInputLegacy theMRInput = null; // start all mr/multi-mr inputs - Set<Input> li = new HashSet<Input>(); + Set<Input> li = new HashSet<>(); for (LogicalInput inp: inputs.values()) { if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) { inp.start(); @@ -516,7 +493,7 @@ public class MapRecordProcessor extends RecordProcessor { // MultiMRInput may not. Fix once TEZ-3302 is resolved. processorContext.waitForAllInputsReady(li); - l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); + LOG.info("The input names are: {}", String.join(",", inputs.keySet())); for (Entry<String, LogicalInput> inp : inputs.entrySet()) { if (inp.getValue() instanceof MRInputLegacy) { if (theMRInput != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index c55a394..13f5f12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -135,7 +135,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", + LOG.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { @@ -184,7 +184,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); + LOG.error("Hit error while closing operators - failing tree"); throw new RuntimeException("Hive Runtime Error while closing operators", e); } @@ -217,7 +217,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - l4j.error(StringUtils.stringifyException(e)); + LOG.error(StringUtils.stringifyException(e)); throw new RuntimeException(e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 8639096..6697f62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; /** * Process input from tez LogicalInput and write output @@ -52,7 +51,7 @@ public abstract class RecordProcessor extends InterruptibleProcessing { protected Map<String, OutputCollector> outMap; protected final ProcessorContext processorContext; - public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class); + private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class); protected MRTaskReporter reporter; @@ -78,7 +77,7 @@ public abstract class RecordProcessor extends InterruptibleProcessing { this.outputs = outputs; checkAbortCondition(); - Utilities.tryLoggingClassPaths(jconf, l4j); + Utilities.tryLoggingClassPaths(jconf, LOG); } /** @@ -91,7 +90,7 @@ public abstract class RecordProcessor extends InterruptibleProcessing { protected void createOutputMap() { Preconditions.checkState(outMap == null, "Outputs should only be setup once"); - outMap = Maps.newHashMap(); + outMap = new HashMap<>(); for (Entry<String, LogicalOutput> entry : outputs.entrySet()) { TezKVOutputCollector collector = new TezKVOutputCollector(entry.getValue()); outMap.put(entry.getKey(), collector); @@ -102,22 +101,17 @@ public abstract class RecordProcessor extends InterruptibleProcessing { ObjectCache cache, List<String> cacheKeys) throws HiveException { String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes != null) { - List<BaseWork> mergeWorkList = new ArrayList<BaseWork>(); + List<BaseWork> mergeWorkList = new ArrayList<>(); for (final String prefix : prefixes.split(",")) { - if (prefix == null || prefix.isEmpty()) { + if (prefix.isEmpty()) { continue; } key = prefix; cacheKeys.add(key); - mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable<Object>() { - @Override - public Object call() { - return Utilities.getMergeWork(jconf, prefix); - } - })); + mergeWorkList.add(cache.retrieve(key, () -> Utilities.getMergeWork(jconf, prefix))); } return mergeWorkList; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 152dc98..03edbf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.LlapUtil; import org.slf4j.Logger; @@ -63,21 +62,20 @@ import com.google.common.collect.Lists; * Just pump the records through the query plan. */ public class ReduceRecordProcessor extends RecordProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ReduceRecordProcessor.class); private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; - private ObjectCache cache, dynamicValueCache; - - public static final Logger l4j = LoggerFactory.getLogger(ReduceRecordProcessor.class); + private final ObjectCache cache, dynamicValueCache; private ReduceWork reduceWork; - List<BaseWork> mergeWorkList = null; - List<String> cacheKeys, dynamicValueCacheKeys; + private final List<BaseWork> mergeWorkList; + private final List<String> cacheKeys; + private final List<String> dynamicValueCacheKeys = new ArrayList<>(); - private final Map<Integer, DummyStoreOperator> connectOps = - new TreeMap<Integer, DummyStoreOperator>(); - private final Map<Integer, ReduceWork> tagToReducerMap = new HashMap<Integer, ReduceWork>(); + private final Map<Integer, DummyStoreOperator> connectOps = new TreeMap<>(); + private final Map<Integer, ReduceWork> tagToReducerMap = new HashMap<>(); private Operator<?> reducer; @@ -94,22 +92,15 @@ public class ReduceRecordProcessor extends RecordProcessor { String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY; cacheKeys = Lists.newArrayList(cacheKey); - dynamicValueCacheKeys = new ArrayList<String>(); - reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() { - @Override - public Object call() { - return Utilities.getReduceWork(jconf); - } - }); + reduceWork = cache.retrieve(cacheKey, () -> Utilities.getReduceWork(jconf)); Utilities.setReduceWork(jconf, reduceWork); mergeWorkList = getMergeWorkList(jconf, cacheKey, queryId, cache, cacheKeys); } @Override - void init( - MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, - Map<String, LogicalOutput> outputs) throws Exception { + void init(MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) + throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); @@ -118,28 +109,22 @@ public class ReduceRecordProcessor extends RecordProcessor { // TODO HIVE-14042. Move to using a loop and a timed wait once TEZ-3302 is fixed. checkAbortCondition(); if (shuffleInputs != null) { - l4j.info("Waiting for ShuffleInputs to become ready"); + LOG.info("Waiting for ShuffleInputs to become ready"); processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs)); } connectOps.clear(); ReduceWork redWork = reduceWork; - l4j.info("Main work is " + reduceWork.getName()); + LOG.info("Main work is " + reduceWork.getName()); List<HashTableDummyOperator> workOps = reduceWork.getDummyOps(); - HashSet<HashTableDummyOperator> dummyOps = workOps == null ? null : new HashSet<>(workOps); + Set<HashTableDummyOperator> dummyOps = workOps == null ? new HashSet<>() : new HashSet<>(workOps); tagToReducerMap.put(redWork.getTag(), redWork); if (mergeWorkList != null) { for (BaseWork mergeWork : mergeWorkList) { - if (l4j.isDebugEnabled()) { - l4j.debug("Additional work " + mergeWork.getName()); - } + LOG.debug("Additional work {}", mergeWork.getName()); workOps = mergeWork.getDummyOps(); if (workOps != null) { - if (dummyOps == null) { - dummyOps = new HashSet<>(workOps); - } else { - dummyOps.addAll(workOps); - } + dummyOps.addAll(workOps); } ReduceWork mergeReduceWork = (ReduceWork) mergeWork; reducer = mergeReduceWork.getReducer(); @@ -167,19 +152,14 @@ public class ReduceRecordProcessor extends RecordProcessor { long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); if (reducer.getConf() != null) { reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask); - l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + LOG.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); } OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask); // Setup values registry String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY; - DynamicValueRegistryTez registryTez = dynamicValueCache.retrieve(valueRegistryKey, - new Callable<DynamicValueRegistryTez>() { - @Override - public DynamicValueRegistryTez call() { - return new DynamicValueRegistryTez(); - } - }); + DynamicValueRegistryTez registryTez = + dynamicValueCache.retrieve(valueRegistryKey, () -> new DynamicValueRegistryTez()); dynamicValueCacheKeys.add(valueRegistryKey); RegistryConfTez registryConf = new RegistryConfTez(jconf, reduceWork, processorContext, inputs); registryTez.init(registryConf); @@ -200,15 +180,15 @@ public class ReduceRecordProcessor extends RecordProcessor { reducer = redWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); - initializeSourceForTag(redWork, i, mainWorkOIs, sources, - redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); + initializeSourceForTag(redWork, i, mainWorkOIs, sources, redWork.getTagToValueDesc().get(0), + redWork.getTagToInput().get(0)); reducer.initializeLocalWork(jconf); } reducer = reduceWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); ((TezContext) MapredContext.get()).setRecordSources(sources); - reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); + reducer.initialize(jconf, new ObjectInspector[] {mainWorkOIs[bigTablePosition]}); for (int i : tagToReducerMap.keySet()) { if (i == bigTablePosition) { continue; @@ -217,7 +197,7 @@ public class ReduceRecordProcessor extends RecordProcessor { reducer = redWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); - reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); + reducer.initialize(jconf, new ObjectInspector[] {mainWorkOIs[i]}); } } checkAbortCondition(); @@ -226,25 +206,21 @@ public class ReduceRecordProcessor extends RecordProcessor { // initialize reduce operator tree try { - l4j.info(reducer.dump(0)); + LOG.info(reducer.dump(0)); // Initialization isn't finished until all parents of all operators // are initialized. For broadcast joins that means initializing the // dummy parent operators as well. - if (dummyOps != null) { - for (HashTableDummyOperator dummyOp : dummyOps) { - // TODO HIVE-14042. Propagating abort to dummyOps. - dummyOp.initialize(jconf, null); - checkAbortCondition(); - } + for (HashTableDummyOperator dummyOp : dummyOps) { + // TODO HIVE-14042. Propagating abort to dummyOps. + dummyOp.initialize(jconf, null); + checkAbortCondition(); } // set output collector for any reduce sink operators in the pipeline. - List<Operator<?>> children = new LinkedList<Operator<?>>(); + List<Operator<?>> children = new ArrayList<>(); children.add(reducer); - if (dummyOps != null) { - children.addAll(dummyOps); - } + children.addAll(dummyOps); createOutputMap(); OperatorUtils.setChildrenCollector(children, outMap); @@ -258,8 +234,7 @@ public class ReduceRecordProcessor extends RecordProcessor { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else if (e instanceof InterruptedException) { - l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", - e.getMessage()); + LOG.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", e.getMessage()); throw (InterruptedException) e; } else { throw new RuntimeException(redWork.getName() + " operator initialization failed", e); @@ -281,9 +256,8 @@ public class ReduceRecordProcessor extends RecordProcessor { } } - private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, - ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName) - throws Exception { + private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector[] ois, ReduceRecordSource[] sources, + TableDesc valueTableDesc, String inputName) throws Exception { reducer = redWork.getReducer(); reducer.getParentOperators().clear(); reducer.setParentOperators(null); // clear out any parents as reducer is the root @@ -295,9 +269,8 @@ public class ReduceRecordProcessor extends RecordProcessor { // Only the big table input source should be vectorized (if applicable) // Note this behavior may have to change if we ever implement a vectorized merge join boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); - sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, - valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(), + sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, + tag == bigTablePosition, (byte) tag, redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(), redWork.getVectorizedTestingReducerBatchSize()); ois[tag] = sources[tag].getObjectInspector(); } @@ -306,7 +279,7 @@ public class ReduceRecordProcessor extends RecordProcessor { void run() throws Exception { for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) { - l4j.info("Starting Output: " + outputEntry.getKey()); + LOG.info("Starting Output: " + outputEntry.getKey()); if (!isAborted()) { outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); @@ -327,10 +300,10 @@ public class ReduceRecordProcessor extends RecordProcessor { super.abort(); if (reducer != null) { - l4j.info("Forwarding abort to reducer: {} " + reducer.getName()); + LOG.info("Forwarding abort to reducer: {} " + reducer.getName()); reducer.abort(); } else { - l4j.info("reducer not setup yet. abort not being forwarded"); + LOG.info("reducer not setup yet. abort not being forwarded"); } } @@ -356,22 +329,22 @@ public class ReduceRecordProcessor extends RecordProcessor { } @Override - void close(){ - if (cache != null && cacheKeys != null) { + void close() { + if (cache != null) { for (String key : cacheKeys) { cache.release(key); } } - if (dynamicValueCache != null && dynamicValueCacheKeys != null) { - for (String k: dynamicValueCacheKeys) { + if (dynamicValueCache != null) { + for (String k : dynamicValueCacheKeys) { dynamicValueCache.release(k); } } try { if (isAborted()) { - for (ReduceRecordSource rs: sources) { + for (ReduceRecordSource rs : sources) { if (!rs.close()) { setAborted(false); // Preserving the old logic. Hmm... break; @@ -402,9 +375,8 @@ public class ReduceRecordProcessor extends RecordProcessor { } catch (Exception e) { if (!isAborted()) { // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException( - "Hive Runtime Error while closing operators: " + e.getMessage(), e); + LOG.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + e.getMessage(), e); } } finally { Utilities.clearWorkMap(jconf); @@ -418,8 +390,7 @@ public class ReduceRecordProcessor extends RecordProcessor { if (childOp instanceof DummyStoreOperator) { return (DummyStoreOperator) childOp; } else { - throw new IllegalStateException("Was expecting dummy store operator but found: " - + childOp); + throw new IllegalStateException("Was expecting dummy store operator but found: " + childOp); } } else { return getJoinParentOp(childOp);