Repository: hive
Updated Branches:
  refs/heads/branch-2.1 9ad5c8528 -> bcb0c8fbb


HIVE-14003. Fix a potential hang in LLAP caused by incorrect preemption request 
handling. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)

(cherry picked from commit a44d9f322f76ec1472c274a4b2bb0f6543976137)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bcb0c8fb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bcb0c8fb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bcb0c8fb

Branch: refs/heads/branch-2.1
Commit: bcb0c8fbb35fc4acec14198e722eb4b8fa589ada
Parents: 9ad5c85
Author: Siddharth Seth <ss...@apache.org>
Authored: Sun Jun 19 12:28:53 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sun Jun 19 12:30:25 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapTaskReporter.java |  4 +--
 .../hadoop/hive/ql/exec/MapredContext.java      |  4 ++-
 .../apache/hadoop/hive/ql/exec/Operator.java    |  1 +
 .../hive/ql/exec/tez/MapRecordProcessor.java    | 33 +++++++++++++++++++-
 .../ql/exec/tez/MergeFileRecordProcessor.java   |  5 +++
 .../hive/ql/exec/tez/RecordProcessor.java       |  2 ++
 .../hive/ql/exec/tez/ReduceRecordProcessor.java | 29 +++++++++++++++--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 19 ++++++++++-
 8 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index dc4482e..5b5418d 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -300,8 +300,8 @@ public class LlapTaskReporter implements 
TaskReporterInterface {
         task.setNextFromEventId(response.getNextFromEventId());
         task.setNextPreRoutedEventId(response.getNextPreRoutedEventId());
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Routing events from heartbeat response to task" + ", 
currentTaskAttemptId="
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Routing events from heartbeat response to task" + ", 
currentTaskAttemptId="
                 + task.getTaskAttemptID() + ", eventCount=" + 
response.getEvents().size()
                 + " fromEventId=" + fromEventId
                 + " nextFromEventId=" + response.getNextFromEventId());

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
index b7ed0c1..7d51658 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
@@ -52,7 +52,9 @@ public class MapredContext {
         HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") 
?
             new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf);
     contexts.set(context);
-    logger.debug("MapredContext initialized.");
+    if (logger.isDebugEnabled()) {
+      logger.debug("MapredContext initialized.");
+    }
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index be141c2..7b312a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -503,6 +503,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   }
 
   public void abort() {
+    LOG.info("Received abort in operator: {}", getName());
     abortOp.set(true);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index f4a9cac..b8ecf89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -113,11 +113,13 @@ public class MapRecordProcessor extends RecordProcessor {
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) 
throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(mrReporter, inputs, outputs);
+    checkAbortCondition();
 
 
     String key = processorContext.getTaskVertexName() + MAP_PLAN_KEY;
     cacheKeys.add(key);
 
+
     // create map and fetch operators
     mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() {
         @Override
@@ -125,6 +127,7 @@ public class MapRecordProcessor extends RecordProcessor {
           return Utilities.getMapWork(jconf);
         }
       });
+    // TODO HIVE-14042. Cleanup may be required if exiting early.
     Utilities.setMapWork(jconf, mapWork);
 
     String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
@@ -139,6 +142,7 @@ public class MapRecordProcessor extends RecordProcessor {
         key = processorContext.getTaskVertexName() + prefix;
         cacheKeys.add(key);
 
+        checkAbortCondition();
         mergeWorkList.add(
             (MapWork) cache.retrieve(key,
                 new Callable<Object>() {
@@ -155,6 +159,7 @@ public class MapRecordProcessor extends RecordProcessor {
     ((TezContext) 
MapredContext.get()).setTezProcessorContext(processorContext);
 
     // Update JobConf using MRInput, info like filename comes via this
+    checkAbortCondition();
     legacyMRInput = getMRInput(inputs);
     if (legacyMRInput != null) {
       Configuration updatedConf = legacyMRInput.getConfigUpdates();
@@ -164,6 +169,7 @@ public class MapRecordProcessor extends RecordProcessor {
         }
       }
     }
+    checkAbortCondition();
 
     createOutputMap();
     // Start all the Outputs.
@@ -172,6 +178,7 @@ public class MapRecordProcessor extends RecordProcessor {
       outputEntry.getValue().start();
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
+    checkAbortCondition();
 
     try {
 
@@ -181,6 +188,13 @@ public class MapRecordProcessor extends RecordProcessor {
       } else {
         mapOp = new MapOperator(runtimeCtx);
       }
+      // Not synchronizing creation of mapOp with an invocation. Check 
immediately
+      // after creation in case abort has been set.
+      // Relying on the regular flow to clean up the actual operator. i.e. If 
an exception is
+      // thrown, an attempt will be made to cleanup the op.
+      // If we are here - exit out via an exception. If we're in the middle of 
the opeartor.initialize
+      // call further down, we rely upon op.abort().
+      checkAbortCondition();
 
       mapOp.clearConnectedOperators();
       mapOp.setExecContext(execContext);
@@ -189,6 +203,9 @@ public class MapRecordProcessor extends RecordProcessor {
       if (mergeWorkList != null) {
         AbstractMapOperator mergeMapOp = null;
         for (BaseWork mergeWork : mergeWorkList) {
+          // TODO HIVE-14042. What is mergeWork, and why is it not part of the 
regular operator chain.
+          // The mergeMapOp.initialize call further down can block, and will 
not receive information
+          // about an abort request.
           MapWork mergeMapWork = (MapWork) mergeWork;
           if (mergeMapWork.getVectorMode()) {
             mergeMapOp = new VectorMapOperator(runtimeCtx);
@@ -256,17 +273,20 @@ public class MapRecordProcessor extends RecordProcessor {
       l4j.info("Main input name is " + mapWork.getName());
       jconf.set(Utilities.INPUT_NAME, mapWork.getName());
       mapOp.initialize(jconf, null);
+      checkAbortCondition();
       mapOp.setChildren(jconf);
       mapOp.passExecContext(execContext);
       l4j.info(mapOp.dump(0));
 
       mapOp.initializeLocalWork(jconf);
 
+      checkAbortCondition();
       initializeMapRecordSources();
       mapOp.initializeMapOperator(jconf);
       if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
         for (AbstractMapOperator mergeMapOp : mergeMapOpList) {
           jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
+          // TODO HIVE-14042. abort handling: Handling of mergeMapOp
           mergeMapOp.initializeMapOperator(jconf);
         }
       }
@@ -279,6 +299,7 @@ public class MapRecordProcessor extends RecordProcessor {
       if (dummyOps != null) {
         for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
           dummyOp.setExecContext(execContext);
+          // TODO HIVE-14042. Handling of dummyOps, and propagating abort 
information to them
           dummyOp.initialize(jconf, null);
         }
       }
@@ -293,6 +314,10 @@ public class MapRecordProcessor extends RecordProcessor {
         // will this be true here?
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
+      } else if (e instanceof InterruptedException) {
+        l4j.info("Hit an interrupt while initializing MapRecordProcessor. 
Message={}",
+            e.getMessage());
+        throw (InterruptedException) e;
       } else {
         throw new RuntimeException("Map operator initialization failed", e);
       }
@@ -368,12 +393,16 @@ public class MapRecordProcessor extends RecordProcessor {
 
   @Override
   public void abort() {
-    // this will stop run() from pushing records
+    // this will stop run() from pushing records, along with potentially
+    // blocking initialization.
     super.abort();
 
     // this will abort initializeOp()
     if (mapOp != null) {
+      l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName());
       mapOp.abort();
+    } else {
+      l4j.info("mapOp not setup yet. abort not being forwarded");
     }
   }
 
@@ -439,6 +468,8 @@ public class MapRecordProcessor extends RecordProcessor {
         li.add(inp);
       }
     }
+    // TODO: HIVE-14042. Potential blocking call. MRInput handles this 
correctly even if an interrupt is swallowed.
+    // MultiMRInput may not. Fix once TEZ-3302 is resolved.
     processorContext.waitForAllInputsReady(li);
 
     l4j.info("The input names are: " + 
Arrays.toString(inputs.keySet().toArray()));

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index 6fad405..ec97856 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -73,6 +73,7 @@ public class MergeFileRecordProcessor extends RecordProcessor 
{
   void init(
       MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
       Map<String, LogicalOutput> outputs) throws Exception {
+    // TODO HIVE-14042. Abort handling.
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(mrReporter, inputs, outputs);
     execContext = new ExecMapperContext(jconf);
@@ -134,6 +135,10 @@ public class MergeFileRecordProcessor extends 
RecordProcessor {
         // will this be true here?
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
+      } else if (e instanceof InterruptedException) {
+        l4j.info("Hit an interrupt while initializing 
MergeFileRecordProcessor. Message={}",
+            e.getMessage());
+        throw (InterruptedException) e;
       } else {
         throw new RuntimeException("Map operator initialization failed", e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index a373ad6..77c7fa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -89,6 +89,8 @@ public abstract class RecordProcessor  {
     isLogInfoEnabled = l4j.isInfoEnabled();
     isLogTraceEnabled = l4j.isTraceEnabled();
 
+    checkAbortCondition();
+
     //log classpaths
     try {
       if (l4j.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 415df92..1390a00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -120,6 +120,8 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
     MapredContext.init(false, new JobConf(jconf));
     List<LogicalInput> shuffleInputs = getShuffleInputs(inputs);
+    // TODO HIVE-14042. Move to using a loop and a timed wait once TEZ-3302 is 
fixed.
+    checkAbortCondition();
     if (shuffleInputs != null) {
       l4j.info("Waiting for ShuffleInputs to become ready");
       processorContext.waitForAllInputsReady(new 
ArrayList<Input>(shuffleInputs));
@@ -132,6 +134,8 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       for (BaseWork mergeWork : mergeWorkList) {
         ReduceWork mergeReduceWork = (ReduceWork) mergeWork;
         reducer = mergeReduceWork.getReducer();
+        // Check immediately after reducer is assigned, in cae the abort came 
in during
+        checkAbortCondition();
         DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer);
         connectOps.put(mergeReduceWork.getTag(), dummyStoreOp);
         tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork);
@@ -139,6 +143,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
       ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps);
     }
+    checkAbortCondition();
 
     bigTablePosition = (byte) reduceWork.getTag();
 
@@ -147,6 +152,8 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     ((TezContext) 
MapredContext.get()).setTezProcessorContext(processorContext);
     int numTags = reduceWork.getTagToValueDesc().size();
     reducer = reduceWork.getReducer();
+    // Check immediately after reducer is assigned, in cae the abort came in 
during
+    checkAbortCondition();
     if (numTags > 1) {
       sources = new ReduceRecordSource[numTags];
       mainWorkOIs = new ObjectInspector[numTags];
@@ -160,11 +167,15 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
       for (int i : tagToReducerMap.keySet()) {
         redWork = tagToReducerMap.get(i);
         reducer = redWork.getReducer();
+        // Check immediately after reducer is assigned, in cae the abort came 
in during
+        checkAbortCondition();
         initializeSourceForTag(redWork, i, mainWorkOIs, sources,
             redWork.getTagToValueDesc().get(0), 
redWork.getTagToInput().get(0));
         reducer.initializeLocalWork(jconf);
       }
       reducer = reduceWork.getReducer();
+      // Check immediately after reducer is assigned, in cae the abort came in 
during
+      checkAbortCondition();
       ((TezContext) MapredContext.get()).setRecordSources(sources);
       reducer.initialize(jconf, new ObjectInspector[] { 
mainWorkOIs[bigTablePosition] });
       for (int i : tagToReducerMap.keySet()) {
@@ -173,9 +184,12 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
         }
         redWork = tagToReducerMap.get(i);
         reducer = redWork.getReducer();
+        // Check immediately after reducer is assigned, in cae the abort came 
in during
+        checkAbortCondition();
         reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] });
       }
     }
+    checkAbortCondition();
 
     reducer = reduceWork.getReducer();
     // initialize reduce operator tree
@@ -188,7 +202,9 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
       if (dummyOps != null) {
         for (HashTableDummyOperator dummyOp : dummyOps) {
+          // TODO HIVE-14042. Propagating abort to dummyOps.
           dummyOp.initialize(jconf, null);
+          checkAbortCondition();
         }
       }
 
@@ -201,6 +217,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       createOutputMap();
       OperatorUtils.setChildrenCollector(children, outMap);
 
+      checkAbortCondition();
       reducer.setReporter(reporter);
       MapredContext.get().setReporter(reporter);
 
@@ -209,6 +226,10 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
       if (e instanceof OutOfMemoryError) {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
+      } else if (e instanceof InterruptedException) {
+        l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. 
Message={}",
+            e.getMessage());
+        throw (InterruptedException) e;
       } else {
         throw new RuntimeException("Reduce operator initialization failed", e);
       }
@@ -223,6 +244,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       if (redWork.getTagToValueDesc().get(tag) == null) {
         continue;
       }
+      checkAbortCondition();
       initializeSourceForTag(redWork, tag, ois, sources, 
redWork.getTagToValueDesc().get(tag),
           redWork.getTagToInput().get(tag));
     }
@@ -270,12 +292,15 @@ public class ReduceRecordProcessor  extends 
RecordProcessor{
 
   @Override
   public void abort() {
-    // this will stop run() from pushing records
+    // this will stop run() from pushing records, along with potentially
+    // blocking initialization.
     super.abort();
 
-    // this will abort initializeOp()
     if (reducer != null) {
+      l4j.info("Forwarding abort to reducer: {} " + reducer.getName());
       reducer.abort();
+    } else {
+      l4j.info("reducer not setup yet. abort not being forwarded");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index e7b7e43..486d43a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -149,6 +149,14 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
     }
 
     synchronized (this) {
+      // This check isn't absolutely mandatory, given the aborted check 
outside of the
+      // Processor creation.
+      if (aborted.get()) {
+        return;
+      }
+      // There should be no blocking operation in RecordProcessor creation,
+      // otherwise the abort operation will not register since they are 
synchronized on the same
+      // lock.
       if (isMap) {
         rproc = new MapRecordProcessor(jobConf, getContext());
       } else {
@@ -159,6 +167,7 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
     if (!aborted.get()) {
       initializeAndRunProcessor(inputs, outputs);
     }
+    // TODO HIVE-14042. In case of an abort request, throw an 
InterruptedException
   }
 
   protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
@@ -168,6 +177,10 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
     try {
 
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
+      // Init and run are both potentially long, and blocking operations. 
Synchronization
+      // with the 'abort' operation will not work since if they end up 
blocking on a monitor
+      // which does not belong to the lock, the abort will end up getting 
blocked.
+      // Both of these method invocations need to handle the abort call on 
their own.
       rproc.init(mrReporter, inputs, outputs);
       rproc.run();
 
@@ -203,13 +216,17 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
 
   @Override
   public void abort() {
-    aborted.set(true);
     RecordProcessor rProcLocal;
     synchronized (this) {
+      LOG.info("Received abort");
+      aborted.set(true);
       rProcLocal = rproc;
     }
     if (rProcLocal != null) {
+      LOG.info("Forwarding abort to RecordProcessor");
       rProcLocal.abort();
+    } else {
+      LOG.info("RecordProcessor not yet setup. Abort will be ignored");
     }
   }
 

Reply via email to