Repository: hive Updated Branches: refs/heads/llap 95a959ff5 -> 1d0881e04
LLAP: NPE when calling abort on the TezProcessor. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d0881e0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d0881e0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d0881e0 Branch: refs/heads/llap Commit: 1d0881e04e1aa9dd10dde8425427f29a53bee97a Parents: 95a959f Author: Siddharth Seth <ss...@apache.org> Authored: Wed May 20 18:45:13 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Wed May 20 18:45:13 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/tez/TezProcessor.java | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d0881e0/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index f8c5314..9baa0c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.text.NumberFormat; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,6 +51,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor { protected boolean isMap = false; protected RecordProcessor rproc = null; + private final AtomicBoolean aborted = new AtomicBoolean(false); protected JobConf jobConf; @@ -115,26 +117,34 @@ public class TezProcessor extends AbstractLogicalIOProcessor { String taskAttemptIdStr = taskAttemptIdBuilder.toString(); this.jobConf.set("mapred.task.id", taskAttemptIdStr); this.jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr); - this.jobConf.setInt("mapred.task.partition",processorContext.getTaskIndex()); + this.jobConf.setInt("mapred.task.partition", processorContext.getTaskIndex()); } @Override public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { + if (aborted.get()) { + return; + } + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) LOG.info("Running task: " + getContext().getUniqueIdentifier()); + synchronized (this) { if (isMap) { rproc = new MapRecordProcessor(jobConf, getContext()); } else { rproc = new ReduceRecordProcessor(jobConf, getContext()); } + } + if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); + } } protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs, @@ -174,7 +184,14 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } public void abort() { - rproc.abort(); + aborted.set(true); + RecordProcessor rProcLocal; + synchronized (this) { + rProcLocal = rproc; + } + if (rProcLocal != null) { + rProcLocal.abort(); + } } /**