Repository: hive Updated Branches: refs/heads/branch-2.1 9ad5c8528 -> bcb0c8fbb
HIVE-14003. Fix a potential hang in LLAP caused by incorrect preemption request handling. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin) (cherry picked from commit a44d9f322f76ec1472c274a4b2bb0f6543976137) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bcb0c8fb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bcb0c8fb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bcb0c8fb Branch: refs/heads/branch-2.1 Commit: bcb0c8fbb35fc4acec14198e722eb4b8fa589ada Parents: 9ad5c85 Author: Siddharth Seth <ss...@apache.org> Authored: Sun Jun 19 12:28:53 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Sun Jun 19 12:30:25 2016 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/LlapTaskReporter.java | 4 +-- .../hadoop/hive/ql/exec/MapredContext.java | 4 ++- .../apache/hadoop/hive/ql/exec/Operator.java | 1 + .../hive/ql/exec/tez/MapRecordProcessor.java | 33 +++++++++++++++++++- .../ql/exec/tez/MergeFileRecordProcessor.java | 5 +++ .../hive/ql/exec/tez/RecordProcessor.java | 2 ++ .../hive/ql/exec/tez/ReduceRecordProcessor.java | 29 +++++++++++++++-- .../hadoop/hive/ql/exec/tez/TezProcessor.java | 19 ++++++++++- 8 files changed, 90 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index dc4482e..5b5418d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -300,8 +300,8 @@ public class LlapTaskReporter implements TaskReporterInterface { task.setNextFromEventId(response.getNextFromEventId()); task.setNextPreRoutedEventId(response.getNextPreRoutedEventId()); if (response.getEvents() != null && !response.getEvents().isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" + if (LOG.isInfoEnabled()) { + LOG.info("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size() + " fromEventId=" + fromEventId + " nextFromEventId=" + response.getNextFromEventId()); http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index b7ed0c1..7d51658 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -52,7 +52,9 @@ public class MapredContext { HiveConf.getVar(jobConf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ? new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf); contexts.set(context); - logger.debug("MapredContext initialized."); + if (logger.isDebugEnabled()) { + logger.debug("MapredContext initialized."); + } return context; } http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index be141c2..7b312a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -503,6 +503,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C } public void abort() { + LOG.info("Received abort in operator: {}", getName()); abortOp.set(true); } http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index f4a9cac..b8ecf89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -113,11 +113,13 @@ public class MapRecordProcessor extends RecordProcessor { Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); + checkAbortCondition(); String key = processorContext.getTaskVertexName() + MAP_PLAN_KEY; cacheKeys.add(key); + // create map and fetch operators mapWork = (MapWork) cache.retrieve(key, new Callable<Object>() { @Override @@ -125,6 +127,7 @@ public class MapRecordProcessor extends RecordProcessor { return Utilities.getMapWork(jconf); } }); + // TODO HIVE-14042. Cleanup may be required if exiting early. Utilities.setMapWork(jconf, mapWork); String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); @@ -139,6 +142,7 @@ public class MapRecordProcessor extends RecordProcessor { key = processorContext.getTaskVertexName() + prefix; cacheKeys.add(key); + checkAbortCondition(); mergeWorkList.add( (MapWork) cache.retrieve(key, new Callable<Object>() { @@ -155,6 +159,7 @@ public class MapRecordProcessor extends RecordProcessor { ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); // Update JobConf using MRInput, info like filename comes via this + checkAbortCondition(); legacyMRInput = getMRInput(inputs); if (legacyMRInput != null) { Configuration updatedConf = legacyMRInput.getConfigUpdates(); @@ -164,6 +169,7 @@ public class MapRecordProcessor extends RecordProcessor { } } } + checkAbortCondition(); createOutputMap(); // Start all the Outputs. @@ -172,6 +178,7 @@ public class MapRecordProcessor extends RecordProcessor { outputEntry.getValue().start(); ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } + checkAbortCondition(); try { @@ -181,6 +188,13 @@ public class MapRecordProcessor extends RecordProcessor { } else { mapOp = new MapOperator(runtimeCtx); } + // Not synchronizing creation of mapOp with an invocation. Check immediately + // after creation in case abort has been set. + // Relying on the regular flow to clean up the actual operator. i.e. If an exception is + // thrown, an attempt will be made to cleanup the op. + // If we are here - exit out via an exception. If we're in the middle of the opeartor.initialize + // call further down, we rely upon op.abort(). + checkAbortCondition(); mapOp.clearConnectedOperators(); mapOp.setExecContext(execContext); @@ -189,6 +203,9 @@ public class MapRecordProcessor extends RecordProcessor { if (mergeWorkList != null) { AbstractMapOperator mergeMapOp = null; for (BaseWork mergeWork : mergeWorkList) { + // TODO HIVE-14042. What is mergeWork, and why is it not part of the regular operator chain. + // The mergeMapOp.initialize call further down can block, and will not receive information + // about an abort request. MapWork mergeMapWork = (MapWork) mergeWork; if (mergeMapWork.getVectorMode()) { mergeMapOp = new VectorMapOperator(runtimeCtx); @@ -256,17 +273,20 @@ public class MapRecordProcessor extends RecordProcessor { l4j.info("Main input name is " + mapWork.getName()); jconf.set(Utilities.INPUT_NAME, mapWork.getName()); mapOp.initialize(jconf, null); + checkAbortCondition(); mapOp.setChildren(jconf); mapOp.passExecContext(execContext); l4j.info(mapOp.dump(0)); mapOp.initializeLocalWork(jconf); + checkAbortCondition(); initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { for (AbstractMapOperator mergeMapOp : mergeMapOpList) { jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); + // TODO HIVE-14042. abort handling: Handling of mergeMapOp mergeMapOp.initializeMapOperator(jconf); } } @@ -279,6 +299,7 @@ public class MapRecordProcessor extends RecordProcessor { if (dummyOps != null) { for (Operator<? extends OperatorDesc> dummyOp : dummyOps){ dummyOp.setExecContext(execContext); + // TODO HIVE-14042. Handling of dummyOps, and propagating abort information to them dummyOp.initialize(jconf, null); } } @@ -293,6 +314,10 @@ public class MapRecordProcessor extends RecordProcessor { // will this be true here? // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing MapRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Map operator initialization failed", e); } @@ -368,12 +393,16 @@ public class MapRecordProcessor extends RecordProcessor { @Override public void abort() { - // this will stop run() from pushing records + // this will stop run() from pushing records, along with potentially + // blocking initialization. super.abort(); // this will abort initializeOp() if (mapOp != null) { + l4j.info("Forwarding abort to mapOp: {} " + mapOp.getName()); mapOp.abort(); + } else { + l4j.info("mapOp not setup yet. abort not being forwarded"); } } @@ -439,6 +468,8 @@ public class MapRecordProcessor extends RecordProcessor { li.add(inp); } } + // TODO: HIVE-14042. Potential blocking call. MRInput handles this correctly even if an interrupt is swallowed. + // MultiMRInput may not. Fix once TEZ-3302 is resolved. processorContext.waitForAllInputsReady(li); l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray())); http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index 6fad405..ec97856 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -73,6 +73,7 @@ public class MergeFileRecordProcessor extends RecordProcessor { void init( MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { + // TODO HIVE-14042. Abort handling. perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(mrReporter, inputs, outputs); execContext = new ExecMapperContext(jconf); @@ -134,6 +135,10 @@ public class MergeFileRecordProcessor extends RecordProcessor { // will this be true here? // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing MergeFileRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Map operator initialization failed", e); } http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index a373ad6..77c7fa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -89,6 +89,8 @@ public abstract class RecordProcessor { isLogInfoEnabled = l4j.isInfoEnabled(); isLogTraceEnabled = l4j.isTraceEnabled(); + checkAbortCondition(); + //log classpaths try { if (l4j.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 415df92..1390a00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -120,6 +120,8 @@ public class ReduceRecordProcessor extends RecordProcessor{ MapredContext.init(false, new JobConf(jconf)); List<LogicalInput> shuffleInputs = getShuffleInputs(inputs); + // TODO HIVE-14042. Move to using a loop and a timed wait once TEZ-3302 is fixed. + checkAbortCondition(); if (shuffleInputs != null) { l4j.info("Waiting for ShuffleInputs to become ready"); processorContext.waitForAllInputsReady(new ArrayList<Input>(shuffleInputs)); @@ -132,6 +134,8 @@ public class ReduceRecordProcessor extends RecordProcessor{ for (BaseWork mergeWork : mergeWorkList) { ReduceWork mergeReduceWork = (ReduceWork) mergeWork; reducer = mergeReduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); DummyStoreOperator dummyStoreOp = getJoinParentOp(reducer); connectOps.put(mergeReduceWork.getTag(), dummyStoreOp); tagToReducerMap.put(mergeReduceWork.getTag(), mergeReduceWork); @@ -139,6 +143,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ ((TezContext) MapredContext.get()).setDummyOpsMap(connectOps); } + checkAbortCondition(); bigTablePosition = (byte) reduceWork.getTag(); @@ -147,6 +152,8 @@ public class ReduceRecordProcessor extends RecordProcessor{ ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext); int numTags = reduceWork.getTagToValueDesc().size(); reducer = reduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); if (numTags > 1) { sources = new ReduceRecordSource[numTags]; mainWorkOIs = new ObjectInspector[numTags]; @@ -160,11 +167,15 @@ public class ReduceRecordProcessor extends RecordProcessor{ for (int i : tagToReducerMap.keySet()) { redWork = tagToReducerMap.get(i); reducer = redWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); initializeSourceForTag(redWork, i, mainWorkOIs, sources, redWork.getTagToValueDesc().get(0), redWork.getTagToInput().get(0)); reducer.initializeLocalWork(jconf); } reducer = reduceWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); ((TezContext) MapredContext.get()).setRecordSources(sources); reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[bigTablePosition] }); for (int i : tagToReducerMap.keySet()) { @@ -173,9 +184,12 @@ public class ReduceRecordProcessor extends RecordProcessor{ } redWork = tagToReducerMap.get(i); reducer = redWork.getReducer(); + // Check immediately after reducer is assigned, in cae the abort came in during + checkAbortCondition(); reducer.initialize(jconf, new ObjectInspector[] { mainWorkOIs[i] }); } } + checkAbortCondition(); reducer = reduceWork.getReducer(); // initialize reduce operator tree @@ -188,7 +202,9 @@ public class ReduceRecordProcessor extends RecordProcessor{ List<HashTableDummyOperator> dummyOps = redWork.getDummyOps(); if (dummyOps != null) { for (HashTableDummyOperator dummyOp : dummyOps) { + // TODO HIVE-14042. Propagating abort to dummyOps. dummyOp.initialize(jconf, null); + checkAbortCondition(); } } @@ -201,6 +217,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ createOutputMap(); OperatorUtils.setChildrenCollector(children, outMap); + checkAbortCondition(); reducer.setReporter(reporter); MapredContext.get().setReporter(reporter); @@ -209,6 +226,10 @@ public class ReduceRecordProcessor extends RecordProcessor{ if (e instanceof OutOfMemoryError) { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; + } else if (e instanceof InterruptedException) { + l4j.info("Hit an interrupt while initializing ReduceRecordProcessor. Message={}", + e.getMessage()); + throw (InterruptedException) e; } else { throw new RuntimeException("Reduce operator initialization failed", e); } @@ -223,6 +244,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ if (redWork.getTagToValueDesc().get(tag) == null) { continue; } + checkAbortCondition(); initializeSourceForTag(redWork, tag, ois, sources, redWork.getTagToValueDesc().get(tag), redWork.getTagToInput().get(tag)); } @@ -270,12 +292,15 @@ public class ReduceRecordProcessor extends RecordProcessor{ @Override public void abort() { - // this will stop run() from pushing records + // this will stop run() from pushing records, along with potentially + // blocking initialization. super.abort(); - // this will abort initializeOp() if (reducer != null) { + l4j.info("Forwarding abort to reducer: {} " + reducer.getName()); reducer.abort(); + } else { + l4j.info("reducer not setup yet. abort not being forwarded"); } } http://git-wip-us.apache.org/repos/asf/hive/blob/bcb0c8fb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index e7b7e43..486d43a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -149,6 +149,14 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } synchronized (this) { + // This check isn't absolutely mandatory, given the aborted check outside of the + // Processor creation. + if (aborted.get()) { + return; + } + // There should be no blocking operation in RecordProcessor creation, + // otherwise the abort operation will not register since they are synchronized on the same + // lock. if (isMap) { rproc = new MapRecordProcessor(jobConf, getContext()); } else { @@ -159,6 +167,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor { if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); } + // TODO HIVE-14042. In case of an abort request, throw an InterruptedException } protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs, @@ -168,6 +177,10 @@ public class TezProcessor extends AbstractLogicalIOProcessor { try { MRTaskReporter mrReporter = new MRTaskReporter(getContext()); + // Init and run are both potentially long, and blocking operations. Synchronization + // with the 'abort' operation will not work since if they end up blocking on a monitor + // which does not belong to the lock, the abort will end up getting blocked. + // Both of these method invocations need to handle the abort call on their own. rproc.init(mrReporter, inputs, outputs); rproc.run(); @@ -203,13 +216,17 @@ public class TezProcessor extends AbstractLogicalIOProcessor { @Override public void abort() { - aborted.set(true); RecordProcessor rProcLocal; synchronized (this) { + LOG.info("Received abort"); + aborted.set(true); rProcLocal = rproc; } if (rProcLocal != null) { + LOG.info("Forwarding abort to RecordProcessor"); rProcLocal.abort(); + } else { + LOG.info("RecordProcessor not yet setup. Abort will be ignored"); } }