HIVE-13679: Pass diagnostic message to failure hooks (Jimmy Xiang, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b1e273e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b1e273e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b1e273e Branch: refs/heads/java8 Commit: 2b1e273e44fe367c12167409e8552efa2770ae7e Parents: b870d52 Author: Jimmy Xiang <jxi...@apache.org> Authored: Tue May 3 14:48:09 2016 -0700 Committer: Jimmy Xiang <jxi...@apache.org> Committed: Fri May 6 07:41:43 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 11 ++++++---- .../org/apache/hadoop/hive/ql/exec/Task.java | 21 ++++++++++++++++---- .../apache/hadoop/hive/ql/exec/TaskResult.java | 7 +++++-- .../apache/hadoop/hive/ql/exec/TaskRunner.java | 5 ++++- .../hive/ql/exec/mr/HadoopJobExecHelper.java | 1 + .../hadoop/hive/ql/exec/mr/JobDebugger.java | 18 +++++++++++------ 6 files changed, 46 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 6a610cb..3fecc5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -118,6 +118,7 @@ import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -1598,7 +1599,8 @@ public class Driver implements CommandProcessor { } else { setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); - invokeFailureHooks(perfLogger, hookContext, result.getTaskError()); + invokeFailureHooks(perfLogger, hookContext, + errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError()); SQLState = "08S01"; console.printError(errorMessage); driverCxt.shutdown(); @@ -1634,7 +1636,7 @@ public class Driver implements CommandProcessor { if (driverCxt.isShutdown()) { SQLState = "HY008"; errorMessage = "FAILED: Operation cancelled"; - invokeFailureHooks(perfLogger, hookContext, null); + invokeFailureHooks(perfLogger, hookContext, errorMessage, null); console.printError(errorMessage); return 1000; } @@ -1691,7 +1693,7 @@ public class Driver implements CommandProcessor { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); if (hookContext != null) { try { - invokeFailureHooks(perfLogger, hookContext, e); + invokeFailureHooks(perfLogger, hookContext, errorMessage, e); } catch (Exception t) { LOG.warn("Failed to invoke failure hook", t); } @@ -1790,7 +1792,8 @@ public class Driver implements CommandProcessor { } } - private void invokeFailureHooks(PerfLogger perfLogger, HookContext hookContext, Throwable exception) throws Exception { + private void invokeFailureHooks(PerfLogger perfLogger, + HookContext hookContext, String errorMessage, Throwable exception) throws Exception { hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); hookContext.setErrorMessage(errorMessage); hookContext.setException(exception); http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 34bdafd..eeaa543 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -27,10 +27,12 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.*; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryDisplay; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -40,6 +42,8 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Task implementation. @@ -84,8 +88,17 @@ public abstract class Task<T extends Serializable> implements Serializable, Node protected T work; private TaskState taskState = TaskState.CREATED; private String statusMessage; + private String diagnosticMesg; private transient boolean fetchSource; + public void setDiagnosticMessage(String diagnosticMesg) { + this.diagnosticMesg = diagnosticMesg; + } + + public String getDiagnosticsMessage() { + return diagnosticMesg; + } + public void setStatusMessage(String statusMessage) { this.statusMessage = statusMessage; updateStatusInQueryDisplay(); @@ -321,7 +334,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node return ret; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public static List<Task<? extends Serializable>> findLeafs(List<Task<? extends Serializable>> rootTasks) { final List<Task<? extends Serializable>> leafTasks = new ArrayList<Task<?>>(); http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java index def9389..3c4ee17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java @@ -37,10 +37,13 @@ public class TaskResult { this.exitVal = exitVal; setRunning(false); } - public void setExitVal(int exitVal, Throwable taskError) { - this.setExitVal(exitVal); + public void setTaskError(Throwable taskError) { this.taskError = taskError; } + public void setExitVal(int exitVal, Throwable taskError) { + setExitVal(exitVal); + setTaskError(taskError); + } public int getExitVal() { return exitVal; http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index 81f6db0..a596e92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -104,7 +104,10 @@ public class TaskRunner extends Thread { } LOG.error("Error in executeTask", t); } - result.setExitVal(exitVal, tsk.getException()); + result.setExitVal(exitVal); + if (tsk.getException() != null) { + result.setTaskError(tsk.getException()); + } } public static long getTaskRunnerID () { http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 11f5cfd..c15316bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -574,6 +574,7 @@ public class HadoopJobExecHelper { Thread t = new Thread(jd); t.start(); t.join(HiveConf.getIntVar(job, HiveConf.ConfVars.JOB_DEBUG_TIMEOUT)); + task.setDiagnosticMessage(jd.getDiagnosticMesg()); int ec = jd.getErrorCode(); if (ec > 0) { returnVal = ec; http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java index 6e4e3bf..d320536 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java @@ -53,6 +53,7 @@ public class JobDebugger implements Runnable { private final Map<String, Integer> failures = new HashMap<String, Integer>(); private final Set<String> successes = new HashSet<String>(); // Successful task ID's private final Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>(); + private String diagnosticMesg; private int maxFailures = 0; // Used for showJobFailDebugInfo @@ -115,7 +116,7 @@ public class JobDebugger implements Runnable { public void run() { try { - showJobFailDebugInfo(); + diagnosticMesg = showJobFailDebugInfo(); } catch (IOException e) { console.printError(e.getMessage()); } @@ -216,8 +217,7 @@ public class JobDebugger implements Runnable { } } - @SuppressWarnings("deprecation") - private void showJobFailDebugInfo() throws IOException { + private String showJobFailDebugInfo() throws IOException { console.printError("Error during job, obtaining debugging information..."); if (!conf.get("mapred.job.tracker", "local").equals("local")) { // Show Tracking URL for remotely running jobs. @@ -241,7 +241,7 @@ public class JobDebugger implements Runnable { } if (failures.keySet().size() == 0) { - return; + return null; } // Find the highest failure count computeMaxFailures() ; @@ -255,6 +255,7 @@ public class JobDebugger implements Runnable { + e.getMessage()); } + String msg = null; for (String task : failures.keySet()) { if (failures.get(task).intValue() == maxFailures) { TaskInfo ti = taskIdToInfo.get(task); @@ -303,14 +304,19 @@ public class JobDebugger implements Runnable { for (String mesg : diagMesgs) { sb.append(mesg + "\n"); } - console.printError(sb.toString()); + msg = sb.toString(); + console.printError(msg); } // Only print out one task because that's good enough for debugging. break; } } - return; + return msg; + } + + public String getDiagnosticMesg() { + return diagnosticMesg; } public int getErrorCode() {