Repository: hive Updated Branches: refs/heads/master d7f71fb4a -> e45da2694
HIVE-15772: set the exception into SparkJobStatus if exception happened in RemoteSparkJobMonitor and LocalSparkJobMonitor (Zhihai Xu, reviewed by Chao Sun) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e45da269 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e45da269 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e45da269 Branch: refs/heads/master Commit: e45da2694a7841427937392b46ac33a8f6ae3fcb Parents: d7f71fb Author: Zhihai Xu <zhihaixu2...@gmail.com> Authored: Fri Feb 3 10:36:15 2017 -0800 Committer: Chao Sun <sunc...@apache.org> Committed: Fri Feb 3 10:36:27 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/spark/status/LocalSparkJobMonitor.java | 1 + .../hive/ql/exec/spark/status/RemoteSparkJobMonitor.java | 1 + .../hadoop/hive/ql/exec/spark/status/SparkJobStatus.java | 2 ++ .../ql/exec/spark/status/impl/LocalSparkJobStatus.java | 10 ++++++++++ .../ql/exec/spark/status/impl/RemoteSparkJobStatus.java | 10 ++++++++++ 5 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index b6d128b..a678228 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -128,6 +128,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; + sparkJobStatus.setError(e); } finally { if (done) { break; http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 77038fc..ef3d8f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -158,6 +158,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; done = true; + sparkJobStatus.setError(e); } finally { if (done) { break; http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 72ce439..1ebb1ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -46,4 +46,6 @@ public interface SparkJobStatus { void cleanup(); Throwable getError(); + + void setError(Throwable e); } http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index a94d4ed..ab8a9cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -49,6 +49,7 @@ public class LocalSparkJobStatus implements SparkJobStatus { private SparkCounters sparkCounters; private JavaFutureAction<Void> future; private Set<Integer> cachedRDDIds; + private Throwable error; public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, @@ -59,6 +60,7 @@ public class LocalSparkJobStatus implements SparkJobStatus { this.sparkCounters = sparkCounters; this.cachedRDDIds = cachedRDDIds; this.future = future; + this.error = null; } @Override @@ -161,6 +163,9 @@ public class LocalSparkJobStatus implements SparkJobStatus { @Override public Throwable getError() { + if (error != null) { + return error; + } if (future.isDone()) { try { future.get(); @@ -171,6 +176,11 @@ public class LocalSparkJobStatus implements SparkJobStatus { return null; } + @Override + public void setError(Throwable e) { + this.error = e; + } + private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e87a21a..0e3e541 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -50,11 +50,13 @@ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Logger LOG = LoggerFactory.getLogger(RemoteSparkJobStatus.class.getName()); private final SparkClient sparkClient; private final JobHandle<Serializable> jobHandle; + private Throwable error; private final transient long sparkClientTimeoutInSeconds; public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle, long timeoutInSeconds) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; + this.error = null; this.sparkClientTimeoutInSeconds = timeoutInSeconds; } @@ -138,9 +140,17 @@ public class RemoteSparkJobStatus implements SparkJobStatus { @Override public Throwable getError() { + if (error != null) { + return error; + } return jobHandle.getError(); } + @Override + public void setError(Throwable e) { + this.error = e; + } + private SparkJobInfo getSparkJobInfo() throws HiveException { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null;