Repository: hive
Updated Branches:
  refs/heads/llap 95a959ff5 -> 1d0881e04


LLAP: NPE when calling abort on the TezProcessor. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d0881e0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d0881e0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d0881e0

Branch: refs/heads/llap
Commit: 1d0881e04e1aa9dd10dde8425427f29a53bee97a
Parents: 95a959f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 20 18:45:13 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 20 18:45:13 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1d0881e0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index f8c5314..9baa0c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +51,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
   protected boolean isMap = false;
 
   protected RecordProcessor rproc = null;
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
 
   protected JobConf jobConf;
 
@@ -115,26 +117,34 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
     String taskAttemptIdStr = taskAttemptIdBuilder.toString();
     this.jobConf.set("mapred.task.id", taskAttemptIdStr);
     this.jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr);
-    
this.jobConf.setInt("mapred.task.partition",processorContext.getTaskIndex());
+    this.jobConf.setInt("mapred.task.partition", 
processorContext.getTaskIndex());
   }
 
   @Override
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> 
outputs)
       throws Exception {
 
+    if (aborted.get()) {
+      return;
+    }
+
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
 
       LOG.info("Running task: " + getContext().getUniqueIdentifier());
 
+    synchronized (this) {
       if (isMap) {
         rproc = new MapRecordProcessor(jobConf, getContext());
       } else {
         rproc = new ReduceRecordProcessor(jobConf, getContext());
       }
+    }
 
+    if (!aborted.get()) {
       initializeAndRunProcessor(inputs, outputs);
+    }
   }
 
   protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
@@ -174,7 +184,14 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
   }
 
   public void abort() {
-    rproc.abort();
+    aborted.set(true);
+    RecordProcessor rProcLocal;
+    synchronized (this) {
+      rProcLocal = rproc;
+    }
+    if (rProcLocal != null) {
+      rProcLocal.abort();
+    }
   }
 
   /**

Reply via email to