This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 805565022872d74bc3053272b100f324ae8b561e
Author: Ivan Suller <isul...@cloudera.com>
AuthorDate: Tue Sep 17 14:59:31 2019 +0200

    HIVE-22065: Code cleanup around 
org.apache.hadoop.hive.ql.exec.tez.RecordProcessor (Ivan Suller via Zoltan 
Haindrich)
    
    Signed-off-by: Zoltan Haindrich <k...@rxd.hu>
---
 .../hive/ql/exec/tez/MapRecordProcessor.java       |  93 ++++++----------
 .../hive/ql/exec/tez/MergeFileRecordProcessor.java |   6 +-
 .../hadoop/hive/ql/exec/tez/RecordProcessor.java   |  20 ++--
 .../hive/ql/exec/tez/ReduceRecordProcessor.java    | 121 ++++++++-------------
 4 files changed, 91 insertions(+), 149 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index ea2e1fd..8c9d53f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -79,21 +77,22 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  * Just pump the records through the query plan.
  */
 public class MapRecordProcessor extends RecordProcessor {
-  public static final Logger l4j = 
LoggerFactory.getLogger(MapRecordProcessor.class);
-  protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+  private static final Logger LOG = 
LoggerFactory.getLogger(MapRecordProcessor.class);
+  private static final String MAP_PLAN_KEY = "__MAP_PLAN__";
 
   private AbstractMapOperator mapOp;
-  private final List<AbstractMapOperator> mergeMapOpList = new 
ArrayList<AbstractMapOperator>();
+  private final List<AbstractMapOperator> mergeMapOpList = new ArrayList<>();
   private MapRecordSource[] sources;
-  private final Map<String, MultiMRInput> multiMRInputMap = new 
HashMap<String, MultiMRInput>();
+  private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<>();
   private int position;
-  MRInputLegacy legacyMRInput;
-  MultiMRInput mainWorkMultiMRInput;
+  private MRInputLegacy legacyMRInput;
+  private MultiMRInput mainWorkMultiMRInput;
   private final ExecMapperContext execContext;
   private MapWork mapWork;
-  List<MapWork> mergeWorkList;
-  List<String> cacheKeys, dynamicValueCacheKeys;
-  ObjectCache cache, dynamicValueCache;
+  private List<MapWork> mergeWorkList;
+  private final List<String> cacheKeys = new ArrayList<>();
+  private final List<String> dynamicValueCacheKeys = new ArrayList<>();
+  private final ObjectCache cache, dynamicValueCache;
 
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext 
context) throws Exception {
     super(jconf, context);
@@ -105,16 +104,12 @@ public class MapRecordProcessor extends RecordProcessor {
     dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, 
true);
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
-    cacheKeys = new ArrayList<String>();
-    dynamicValueCacheKeys = new ArrayList<String>();
   }
 
   private void setLlapOfFragmentId(final ProcessorContext context) {
     // TODO: could we do this only if the OF is actually used?
     String attemptId = Converters.createTaskAttemptId(context).toString();
-    if (l4j.isDebugEnabled()) {
-      l4j.debug("Setting the LLAP fragment ID for OF to " + attemptId);
-    }
+    LOG.debug("Setting the LLAP fragment ID for OF to {}", attemptId);
     jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, attemptId);
   }
 
@@ -131,12 +126,7 @@ public class MapRecordProcessor extends RecordProcessor {
 
 
     // create map and fetch operators
-    mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() {
-        @Override
-        public Object call() {
-          return Utilities.getMapWork(jconf);
-        }
-      });
+    mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf));
     // TODO HIVE-14042. Cleanup may be required if exiting early.
     Utilities.setMapWork(jconf, mapWork);
 
@@ -147,7 +137,7 @@ public class MapRecordProcessor extends RecordProcessor {
 
     String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
     if (prefixes != null) {
-      mergeWorkList = new ArrayList<MapWork>();
+      mergeWorkList = new ArrayList<>();
 
       for (final String prefix : prefixes.split(",")) {
         if (prefix == null || prefix.isEmpty()) {
@@ -159,13 +149,7 @@ public class MapRecordProcessor extends RecordProcessor {
 
         checkAbortCondition();
         mergeWorkList.add(
-            (MapWork) cache.retrieve(key,
-                new Callable<Object>() {
-                  @Override
-                  public Object call() {
-                    return Utilities.getMergeWork(jconf, prefix);
-                  }
-                }));
+            (MapWork) cache.retrieve(key, () -> Utilities.getMergeWork(jconf, 
prefix)));
       }
     }
 
@@ -189,7 +173,7 @@ public class MapRecordProcessor extends RecordProcessor {
     createOutputMap();
     // Start all the Outputs.
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
-      l4j.debug("Starting Output: " + outputEntry.getKey());
+      LOG.debug("Starting Output: " + outputEntry.getKey());
       outputEntry.getValue().start();
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
@@ -233,25 +217,25 @@ public class MapRecordProcessor extends RecordProcessor {
           // initialize the merge operators first.
           if (mergeMapOp != null) {
             mergeMapOp.setConf(mergeMapWork);
-            l4j.info("Input name is " + mergeMapWork.getName());
+            LOG.info("Input name is {}", mergeMapWork.getName());
             jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
             mergeMapOp.initialize(jconf, null);
             // if there are no files/partitions to read, we need to skip 
trying to read
             MultiMRInput multiMRInput = 
multiMRInputMap.get(mergeMapWork.getName());
             boolean skipRead = false;
             if (multiMRInput == null) {
-              l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " 
is null. Skipping read.");
+              LOG.info("Multi MR Input for work {} is null. Skipping read.", 
mergeMapWork.getName());
               skipRead = true;
             } else {
               Collection<KeyValueReader> keyValueReaders = 
multiMRInput.getKeyValueReaders();
               if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) {
-                l4j.info("Key value readers are null or empty and hence 
skipping read. "
-                    + "KeyValueReaders = " + keyValueReaders);
+                LOG.info("Key value readers are null or empty and hence 
skipping read. "
+                    + "KeyValueReaders = {}", keyValueReaders);
                 skipRead = true;
               }
             }
             if (skipRead) {
-              List<Operator<?>> children = new ArrayList<Operator<?>>();
+              List<Operator<?>> children = new ArrayList<>();
               children.addAll(mergeMapOp.getConf().getAliasToWork().values());
               // do the same thing as setChildren when there is nothing to 
read.
               // the setChildren method initializes the object inspector 
needed by the operators
@@ -286,19 +270,19 @@ public class MapRecordProcessor extends RecordProcessor {
 
       // initialize map operator
       mapOp.setConf(mapWork);
-      l4j.info("Main input name is " + mapWork.getName());
+      LOG.info("Main input name is " + mapWork.getName());
       jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       mapOp.initialize(jconf, null);
       checkAbortCondition();
       mapOp.setChildren(jconf);
       mapOp.passExecContext(execContext);
-      l4j.info(mapOp.dump(0));
+      LOG.info(mapOp.dump(0));
 
       // set memory available for operators
       long memoryAvailableToTask = 
processorContext.getTotalMemoryAvailableToTask();
       if (mapOp.getConf() != null) {
         mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
-        l4j.info("Memory available for operators set to {}", 
LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+        LOG.info("Memory available for operators set to {}", 
LlapUtil.humanReadableByteCount(memoryAvailableToTask));
       }
       OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), 
memoryAvailableToTask);
 
@@ -309,12 +293,7 @@ public class MapRecordProcessor extends RecordProcessor {
       String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY;
       // On LLAP dynamic value registry might already be cached.
       final DynamicValueRegistryTez registryTez = 
dynamicValueCache.retrieve(valueRegistryKey,
-          new Callable<DynamicValueRegistryTez>() {
-            @Override
-            public DynamicValueRegistryTez call() {
-              return new DynamicValueRegistryTez();
-            }
-          });
+          () -> new DynamicValueRegistryTez());
       dynamicValueCacheKeys.add(valueRegistryKey);
       RegistryConfTez registryConf = new RegistryConfTez(jconf, mapWork, 
processorContext, inputs);
       registryTez.init(registryConf);
@@ -322,7 +301,7 @@ public class MapRecordProcessor extends RecordProcessor {
       checkAbortCondition();
       initializeMapRecordSources();
       mapOp.initializeMapOperator(jconf);
-      if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
+      if ((mergeMapOpList != null) && !mergeMapOpList.isEmpty()) {
         for (AbstractMapOperator mergeMapOp : mergeMapOpList) {
           jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
           // TODO HIVE-14042. abort handling: Handling of mergeMapOp
@@ -354,7 +333,7 @@ public class MapRecordProcessor extends RecordProcessor {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else if (e instanceof InterruptedException) {
-        l4j.info("Hit an interrupt while initializing MapRecordProcessor. 
Message={}",
+        LOG.info("Hit an interrupt while initializing MapRecordProcessor. 
Message={}",
             e.getMessage());
         throw (InterruptedException) e;
       } else {
@@ -383,7 +362,7 @@ public class MapRecordProcessor extends RecordProcessor {
       String inputName = mapOp.getConf().getName();
       MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
       Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
-      l4j.debug("There are " + kvReaders.size() + " key-value readers for 
input " + inputName);
+      LOG.debug("There are {} key-value readers for input {}", 
kvReaders.size(), inputName);
       if (kvReaders.size() > 0) {
         reader = getKeyValueReader(kvReaders, mapOp);
         sources[tag].init(jconf, mapOp, reader);
@@ -392,10 +371,8 @@ public class MapRecordProcessor extends RecordProcessor {
     ((TezContext) MapredContext.get()).setRecordSources(sources);
   }
 
-  @SuppressWarnings("deprecation")
   private KeyValueReader getKeyValueReader(Collection<KeyValueReader> 
keyValueReaders,
-      AbstractMapOperator mapOp)
-    throws Exception {
+      AbstractMapOperator mapOp) throws Exception {
     List<KeyValueReader> kvReaderList = new 
ArrayList<KeyValueReader>(keyValueReaders);
     // this sets up the map operator contexts correctly
     mapOp.initializeContexts();
@@ -436,10 +413,10 @@ public class MapRecordProcessor extends RecordProcessor {
 
     // this will abort initializeOp()
     if (mapOp != null) {
-      l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName());
+      LOG.info("Forwarding abort to mapOp: {} ", mapOp.getName());
       mapOp.abort();
     } else {
-      l4j.info("mapOp not setup yet. abort not being forwarded");
+      LOG.info("mapOp not setup yet. abort not being forwarded");
     }
   }
 
@@ -450,13 +427,13 @@ public class MapRecordProcessor extends RecordProcessor {
       setAborted(execContext.getIoCxt().getIOExceptions());
     }
 
-    if (cache != null && cacheKeys != null) {
+    if (cache != null) {
       for (String k: cacheKeys) {
         cache.release(k);
       }
     }
 
-    if (dynamicValueCache != null && dynamicValueCacheKeys != null) {
+    if (dynamicValueCache != null) {
       for (String k: dynamicValueCacheKeys) {
         dynamicValueCache.release(k);
       }
@@ -491,7 +468,7 @@ public class MapRecordProcessor extends RecordProcessor {
     } catch (Exception e) {
       if (!isAborted()) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
+        LOG.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing 
operators", e);
       }
     } finally {
@@ -505,7 +482,7 @@ public class MapRecordProcessor extends RecordProcessor {
     MRInputLegacy theMRInput = null;
 
     // start all mr/multi-mr inputs
-    Set<Input> li = new HashSet<Input>();
+    Set<Input> li = new HashSet<>();
     for (LogicalInput inp: inputs.values()) {
       if (inp instanceof MRInputLegacy || inp instanceof MultiMRInput) {
         inp.start();
@@ -516,7 +493,7 @@ public class MapRecordProcessor extends RecordProcessor {
     // MultiMRInput may not. Fix once TEZ-3302 is resolved.
     processorContext.waitForAllInputsReady(li);
 
-    l4j.info("The input names are: " + 
Arrays.toString(inputs.keySet().toArray()));
+    LOG.info("The input names are: {}", String.join(",", inputs.keySet()));
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {
         if (theMRInput != null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index c55a394..13f5f12 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -135,7 +135,7 @@ public class MergeFileRecordProcessor extends 
RecordProcessor {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else if (e instanceof InterruptedException) {
-        l4j.info("Hit an interrupt while initializing 
MergeFileRecordProcessor. Message={}",
+        LOG.info("Hit an interrupt while initializing 
MergeFileRecordProcessor. Message={}",
             e.getMessage());
         throw (InterruptedException) e;
       } else {
@@ -184,7 +184,7 @@ public class MergeFileRecordProcessor extends 
RecordProcessor {
     } catch (Exception e) {
       if (!isAborted()) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
+        LOG.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing 
operators",
             e);
       }
@@ -217,7 +217,7 @@ public class MergeFileRecordProcessor extends 
RecordProcessor {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else {
-        l4j.error(StringUtils.stringifyException(e));
+        LOG.error(StringUtils.stringifyException(e));
         throw new RuntimeException(e);
       }
     }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 8639096..6697f62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 
 /**
  * Process input from tez LogicalInput and write output
@@ -52,7 +51,7 @@ public abstract class RecordProcessor extends 
InterruptibleProcessing {
   protected Map<String, OutputCollector> outMap;
   protected final ProcessorContext processorContext;
 
-  public static final Logger l4j = 
LoggerFactory.getLogger(RecordProcessor.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecordProcessor.class);
 
   protected MRTaskReporter reporter;
 
@@ -78,7 +77,7 @@ public abstract class RecordProcessor extends 
InterruptibleProcessing {
     this.outputs = outputs;
 
     checkAbortCondition();
-    Utilities.tryLoggingClassPaths(jconf, l4j);
+    Utilities.tryLoggingClassPaths(jconf, LOG);
   }
 
   /**
@@ -91,7 +90,7 @@ public abstract class RecordProcessor extends 
InterruptibleProcessing {
 
   protected void createOutputMap() {
     Preconditions.checkState(outMap == null, "Outputs should only be setup 
once");
-    outMap = Maps.newHashMap();
+    outMap = new HashMap<>();
     for (Entry<String, LogicalOutput> entry : outputs.entrySet()) {
       TezKVOutputCollector collector = new 
TezKVOutputCollector(entry.getValue());
       outMap.put(entry.getKey(), collector);
@@ -102,22 +101,17 @@ public abstract class RecordProcessor extends 
InterruptibleProcessing {
       ObjectCache cache, List<String> cacheKeys) throws HiveException {
     String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
     if (prefixes != null) {
-      List<BaseWork> mergeWorkList = new ArrayList<BaseWork>();
+      List<BaseWork> mergeWorkList = new ArrayList<>();
 
       for (final String prefix : prefixes.split(",")) {
-        if (prefix == null || prefix.isEmpty()) {
+        if (prefix.isEmpty()) {
           continue;
         }
 
         key = prefix;
         cacheKeys.add(key);
 
-        mergeWorkList.add((BaseWork) cache.retrieve(key, new 
Callable<Object>() {
-          @Override
-          public Object call() {
-            return Utilities.getMergeWork(jconf, prefix);
-          }
-        }));
+        mergeWorkList.add(cache.retrieve(key, () -> 
Utilities.getMergeWork(jconf, prefix)));
       }
 
       return mergeWorkList;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 152dc98..03edbf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.hive.ql.exec.tez;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.slf4j.Logger;
@@ -63,21 +62,20 @@ import com.google.common.collect.Lists;
  * Just pump the records through the query plan.
  */
 public class ReduceRecordProcessor extends RecordProcessor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReduceRecordProcessor.class);
 
   private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
 
-  private ObjectCache cache, dynamicValueCache;
-
-  public static final Logger l4j = 
LoggerFactory.getLogger(ReduceRecordProcessor.class);
+  private final ObjectCache cache, dynamicValueCache;
 
   private ReduceWork reduceWork;
 
-  List<BaseWork> mergeWorkList = null;
-  List<String> cacheKeys, dynamicValueCacheKeys;
+  private final List<BaseWork> mergeWorkList;
+  private final List<String> cacheKeys;
+  private final List<String> dynamicValueCacheKeys = new ArrayList<>();
 
-  private final Map<Integer, DummyStoreOperator> connectOps =
-      new TreeMap<Integer, DummyStoreOperator>();
-  private final Map<Integer, ReduceWork> tagToReducerMap = new 
HashMap<Integer, ReduceWork>();
+  private final Map<Integer, DummyStoreOperator> connectOps = new TreeMap<>();
+  private final Map<Integer, ReduceWork> tagToReducerMap = new HashMap<>();
 
   private Operator<?> reducer;
 
@@ -94,22 +92,15 @@ public class ReduceRecordProcessor extends RecordProcessor {
 
     String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
     cacheKeys = Lists.newArrayList(cacheKey);
-    dynamicValueCacheKeys = new ArrayList<String>();
-    reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
-        @Override
-        public Object call() {
-          return Utilities.getReduceWork(jconf);
-      }
-    });
+    reduceWork = cache.retrieve(cacheKey, () -> 
Utilities.getReduceWork(jconf));
 
     Utilities.setReduceWork(jconf, reduceWork);
     mergeWorkList = getMergeWorkList(jconf, cacheKey, queryId, cache, 
cacheKeys);
   }
 
   @Override
-  void init(
-      MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
+  void init(MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, 
Map<String, LogicalOutput> outputs)
+      throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(mrReporter, inputs, outputs);
 
@@ -118,28 +109,22 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
     // TODO HIVE-14042. Move to using a loop and a timed wait once TEZ-3302 is 
fixed.
     checkAbortCondition();
     if (shuffleInputs != null) {
-      l4j.info("Waiting for ShuffleInputs to become ready");
+      LOG.info("Waiting for ShuffleInputs to become ready");
       processorContext.waitForAllInputsReady(new 
ArrayList<Input>(shuffleInputs));
     }
 
     connectOps.clear();
     ReduceWork redWork = reduceWork;
-    l4j.info("Main work is " + reduceWork.getName());
+    LOG.info("Main work is " + reduceWork.getName());
     List<HashTableDummyOperator> workOps = reduceWork.getDummyOps();
-    HashSet<HashTableDummyOperator> dummyOps = workOps == null ? null : new 
HashSet<>(workOps);
+    Set<HashTableDummyOperator> dummyOps = workOps == null ? new HashSet<>() : 
new HashSet<>(workOps);
     tagToReducerMap.put(redWork.getTag(), redWork);
     if (mergeWorkList != null) {
       for (BaseWork mergeWork : mergeWorkList) {
-        if (l4j.isDebugEnabled()) {
-          l4j.debug("Additional work " + mergeWork.getName());
-        }
+        LOG.debug("Additional work {}", mergeWork.getName());
         workOps = mergeWork.getDummyOps();
         if (workOps != null) {
-          if (dummyOps == null) {
-            dummyOps = new HashSet<>(workOps);
-          } else {
-            dummyOps.addAll(workOps);
-          }
+          dummyOps.addAll(workOps);
         }
         ReduceWork mergeReduceWork = (ReduceWork) mergeWork;
         reducer = mergeReduceWork.getReducer();
@@ -167,19 +152,14 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
     long memoryAvailableToTask = 
processorContext.getTotalMemoryAvailableToTask();
     if (reducer.getConf() != null) {
       reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask);
-      l4j.info("Memory available for operators set to {}", 
LlapUtil.humanReadableByteCount(memoryAvailableToTask));
+      LOG.info("Memory available for operators set to {}", 
LlapUtil.humanReadableByteCount(memoryAvailableToTask));
     }
     OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), 
memoryAvailableToTask);
 
     // Setup values registry
     String valueRegistryKey = DynamicValue.DYNAMIC_VALUE_REGISTRY_CACHE_KEY;
-    DynamicValueRegistryTez registryTez = 
dynamicValueCache.retrieve(valueRegistryKey,
-        new Callable<DynamicValueRegistryTez>() {
-          @Override
-          public DynamicValueRegistryTez call() {
-            return new DynamicValueRegistryTez();
-          }
-        });
+    DynamicValueRegistryTez registryTez =
+        dynamicValueCache.retrieve(valueRegistryKey, () -> new 
DynamicValueRegistryTez());
     dynamicValueCacheKeys.add(valueRegistryKey);
     RegistryConfTez registryConf = new RegistryConfTez(jconf, reduceWork, 
processorContext, inputs);
     registryTez.init(registryConf);
@@ -200,15 +180,15 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
         reducer = redWork.getReducer();
         // Check immediately after reducer is assigned, in cae the abort came 
in during
         checkAbortCondition();
-        initializeSourceForTag(redWork, i, mainWorkOIs, sources,
-            redWork.getTagToValueDesc().get(0), 
redWork.getTagToInput().get(0));
+        initializeSourceForTag(redWork, i, mainWorkOIs, sources, 
redWork.getTagToValueDesc().get(0),
+            redWork.getTagToInput().get(0));
         reducer.initializeLocalWork(jconf);
       }
       reducer = reduceWork.getReducer();
       // Check immediately after reducer is assigned, in cae the abort came in 
during
       checkAbortCondition();
       ((TezContext) MapredContext.get()).setRecordSources(sources);
-      reducer.initialize(jconf, new ObjectInspector[] { 
mainWorkOIs[bigTablePosition] });
+      reducer.initialize(jconf, new ObjectInspector[] 
{mainWorkOIs[bigTablePosition]});
       for (int i : tagToReducerMap.keySet()) {
         if (i == bigTablePosition) {
           continue;
@@ -217,7 +197,7 @@ public class ReduceRecordProcessor extends RecordProcessor {
         reducer = redWork.getReducer();
         // Check immediately after reducer is assigned, in cae the abort came 
in during
         checkAbortCondition();
-        reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] });
+        reducer.initialize(jconf, new ObjectInspector[] {mainWorkOIs[i]});
       }
     }
     checkAbortCondition();
@@ -226,25 +206,21 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
 
     // initialize reduce operator tree
     try {
-      l4j.info(reducer.dump(0));
+      LOG.info(reducer.dump(0));
 
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
       // dummy parent operators as well.
-      if (dummyOps != null) {
-        for (HashTableDummyOperator dummyOp : dummyOps) {
-          // TODO HIVE-14042. Propagating abort to dummyOps.
-          dummyOp.initialize(jconf, null);
-          checkAbortCondition();
-        }
+      for (HashTableDummyOperator dummyOp : dummyOps) {
+        // TODO HIVE-14042. Propagating abort to dummyOps.
+        dummyOp.initialize(jconf, null);
+        checkAbortCondition();
       }
 
       // set output collector for any reduce sink operators in the pipeline.
-      List<Operator<?>> children = new LinkedList<Operator<?>>();
+      List<Operator<?>> children = new ArrayList<>();
       children.add(reducer);
-      if (dummyOps != null) {
-        children.addAll(dummyOps);
-      }
+      children.addAll(dummyOps);
       createOutputMap();
       OperatorUtils.setChildrenCollector(children, outMap);
 
@@ -258,8 +234,7 @@ public class ReduceRecordProcessor extends RecordProcessor {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else if (e instanceof InterruptedException) {
-        l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. 
Message={}",
-            e.getMessage());
+        LOG.info("Hit an interrupt while initializing ReduceRecordProcessor. 
Message={}", e.getMessage());
         throw (InterruptedException) e;
       } else {
         throw new RuntimeException(redWork.getName() + " operator 
initialization failed", e);
@@ -281,9 +256,8 @@ public class ReduceRecordProcessor extends RecordProcessor {
     }
   }
 
-  private void initializeSourceForTag(ReduceWork redWork, int tag, 
ObjectInspector[] ois,
-      ReduceRecordSource[] sources, TableDesc valueTableDesc, String inputName)
-      throws Exception {
+  private void initializeSourceForTag(ReduceWork redWork, int tag, 
ObjectInspector[] ois, ReduceRecordSource[] sources,
+      TableDesc valueTableDesc, String inputName) throws Exception {
     reducer = redWork.getReducer();
     reducer.getParentOperators().clear();
     reducer.setParentOperators(null); // clear out any parents as reducer is 
the root
@@ -295,9 +269,8 @@ public class ReduceRecordProcessor extends RecordProcessor {
     // Only the big table input source should be vectorized (if applicable)
     // Note this behavior may have to change if we ever implement a vectorized 
merge join
     boolean vectorizedRecordSource = (tag == bigTablePosition) && 
redWork.getVectorMode();
-    sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, 
keyTableDesc,
-        valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
-        redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(),
+    sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, 
keyTableDesc, valueTableDesc, reader,
+        tag == bigTablePosition, (byte) tag, 
redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum(),
         redWork.getVectorizedTestingReducerBatchSize());
     ois[tag] = sources[tag].getObjectInspector();
   }
@@ -306,7 +279,7 @@ public class ReduceRecordProcessor extends RecordProcessor {
   void run() throws Exception {
 
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
-      l4j.info("Starting Output: " + outputEntry.getKey());
+      LOG.info("Starting Output: " + outputEntry.getKey());
       if (!isAborted()) {
         outputEntry.getValue().start();
         ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
@@ -327,10 +300,10 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
     super.abort();
 
     if (reducer != null) {
-      l4j.info("Forwarding abort to reducer: {} " + reducer.getName());
+      LOG.info("Forwarding abort to reducer: {} " + reducer.getName());
       reducer.abort();
     } else {
-      l4j.info("reducer not setup yet. abort not being forwarded");
+      LOG.info("reducer not setup yet. abort not being forwarded");
     }
   }
 
@@ -356,22 +329,22 @@ public class ReduceRecordProcessor extends 
RecordProcessor {
   }
 
   @Override
-  void close(){
-    if (cache != null && cacheKeys != null) {
+  void close() {
+    if (cache != null) {
       for (String key : cacheKeys) {
         cache.release(key);
       }
     }
 
-    if (dynamicValueCache != null && dynamicValueCacheKeys != null) {
-      for (String k: dynamicValueCacheKeys) {
+    if (dynamicValueCache != null) {
+      for (String k : dynamicValueCacheKeys) {
         dynamicValueCache.release(k);
       }
     }
 
     try {
       if (isAborted()) {
-        for (ReduceRecordSource rs: sources) {
+        for (ReduceRecordSource rs : sources) {
           if (!rs.close()) {
             setAborted(false); // Preserving the old logic. Hmm...
             break;
@@ -402,9 +375,8 @@ public class ReduceRecordProcessor extends RecordProcessor {
     } catch (Exception e) {
       if (!isAborted()) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
-        throw new RuntimeException(
-            "Hive Runtime Error while closing operators: " + e.getMessage(), 
e);
+        LOG.error("Hit error while closing operators - failing tree");
+        throw new RuntimeException("Hive Runtime Error while closing 
operators: " + e.getMessage(), e);
       }
     } finally {
       Utilities.clearWorkMap(jconf);
@@ -418,8 +390,7 @@ public class ReduceRecordProcessor extends RecordProcessor {
         if (childOp instanceof DummyStoreOperator) {
           return (DummyStoreOperator) childOp;
         } else {
-          throw new IllegalStateException("Was expecting dummy store operator 
but found: "
-              + childOp);
+          throw new IllegalStateException("Was expecting dummy store operator 
but found: " + childOp);
         }
       } else {
         return getJoinParentOp(childOp);

Reply via email to