Author: xuefu Date: Tue Jan 20 19:24:52 2015 New Revision: 1653344 URL: http://svn.apache.org/r1653344 Log: HIVE-9395: Make WAIT_SUBMISSION_TIMEOUT configuable and check timeout in SparkJobMonitor level.[Spark Branch] (Chengxiang via Xuefu)
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan 20 19:24:52 2015 @@ -1986,7 +1986,12 @@ public class HiveConf extends Configurat "hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), - "remote spark client JobHandle future timeout value in seconds.") + "Remote Spark client JobHandle future timeout value in seconds."), + SPARK_JOB_MONITOR_TIMEOUT( + "hive.spark.job.monitor.timeout", + "60s", + new TimeValidator(TimeUnit.SECONDS), + "Spark job monitor timeout if could not get job state in specified time interval.") ; public final String varname; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Jan 20 19:24:52 2015 @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.DriverC import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -136,7 +137,7 @@ public class LocalHiveSparkClient implem int jobId = future.jobIds().get(0); LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus( sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), future); - return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); + return new LocalSparkJobRef(Integer.toString(jobId), sparkJobStatus, sc); } /** Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Jan 20 19:24:52 2015 @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -122,7 +123,7 @@ public class RemoteHiveSparkClient imple JobHandle<Serializable> jobHandle = remoteClient.submit( new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes)); - return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle, timeout)); + return new RemoteSparkJobRef(jobHandle, new RemoteSparkJobStatus(remoteClient, jobHandle, timeout)); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue Jan 20 19:24:52 2015 @@ -104,14 +104,18 @@ public class SparkTask extends Task<Spar SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (sparkJobStatus != null) { - SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); + SparkJobMonitor monitor = new SparkJobMonitor(conf, sparkJobStatus); rc = monitor.startMonitor(); - // for RSC, we should get the counters after job has finished sparkCounters = sparkJobStatus.getCounter(); - SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); - logSparkStatistic(sparkStatistics); + if (rc == 0 ) { + // for RSC, we should get the counters after job has finished + SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (LOG.isInfoEnabled() && sparkStatistics != null) { + LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); + logSparkStatistic(sparkStatistics); + } + } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. + jobRef.cancelJob(); } sparkJobStatus.cleanup(); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Tue Jan 20 19:24:52 2015 @@ -24,9 +24,11 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -43,15 +45,19 @@ public class SparkJobMonitor { private transient LogHelper console; private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private final long monitorTimeoutInteval; private final int checkInterval = 1000; private final int printInterval = 3000; + private final HiveConf hiveConf; private long lastPrintTime; private Set<String> completed; private SparkJobStatus sparkJobStatus; - public SparkJobMonitor(SparkJobStatus sparkJobStatus) { + public SparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) { this.sparkJobStatus = sparkJobStatus; + this.hiveConf = hiveConf; + monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); console = new LogHelper(LOG); } @@ -63,19 +69,30 @@ public class SparkJobMonitor { int rc = 0; JobExecutionStatus lastState = null; Map<String, SparkStageProgress> lastProgressMap = null; - long startTime = -1; perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + long startTime = System.currentTimeMillis(); + while (true) { - JobExecutionStatus state = sparkJobStatus.getState(); try { + JobExecutionStatus state = sparkJobStatus.getState(); if (LOG.isDebugEnabled()) { console.printInfo("state = " + state); } - if (state != null && state != JobExecutionStatus.UNKNOWN - && (state != lastState || state == JobExecutionStatus.RUNNING)) { + + if (state == null) { + long timeCount = (System.currentTimeMillis() - startTime)/1000; + if (timeCount > monitorTimeoutInteval) { + LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Status: " + state); + running = false; + done = true; + rc = 2; + break; + } + } else if (state != lastState || state == JobExecutionStatus.RUNNING) { lastState = state; Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress(); @@ -92,27 +109,21 @@ public class SparkJobMonitor { console.printInfo("\nStatus: Running (Hive on Spark job[" + sparkJobStatus.getJobId() + "])"); - startTime = System.currentTimeMillis(); running = true; console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); } - printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; break; case SUCCEEDED: printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; - if (startTime < 0) { - console.printInfo("Status: Finished successfully within a check interval."); - } else { - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: Finished successfully in " - + String.format("%.2f seconds", duration)); - } + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); running = false; done = true; break; @@ -120,13 +131,13 @@ public class SparkJobMonitor { console.printError("Status: Failed"); running = false; done = true; - rc = 2; + rc = 3; break; case UNKNOWN: console.printError("Status: Unknown"); running = false; done = true; - rc = 2; + rc = 4; break; } } @@ -135,17 +146,14 @@ public class SparkJobMonitor { } } catch (Exception e) { String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; - if (state == null || state.equals(JobExecutionStatus.UNKNOWN)) { - msg = "Job Submission failed" + msg; - } else { - msg = "Ended Job = " + sparkJobStatus.getJobId() + msg; - } + msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg; // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils LOG.error(msg, e); console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rc = 1; + done = true; } finally { if (done) { break; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Tue Jan 20 19:24:52 2015 @@ -17,36 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status; -public class SparkJobRef { +public interface SparkJobRef { - private String jobId; + public String getJobId(); - private SparkJobStatus sparkJobStatus; + public SparkJobStatus getSparkJobStatus(); - public SparkJobRef() { } + public boolean cancelJob(); - public SparkJobRef(String jobId) { - this.jobId = jobId; - } - - public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) { - this.jobId = jobId; - this.sparkJobStatus = sparkJobStatus; - } - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - public SparkJobStatus getSparkJobStatus() { - return sparkJobStatus; - } - - public void setSparkJobStatus(SparkJobStatus sparkJobStatus) { - this.sparkJobStatus = sparkJobStatus; - } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Tue Jan 20 19:24:52 2015 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JobExecutionStatus; @@ -30,11 +31,11 @@ public interface SparkJobStatus { int getJobId(); - JobExecutionStatus getState(); + JobExecutionStatus getState() throws HiveException; - int[] getStageIds(); + int[] getStageIds() throws HiveException; - Map<String, SparkStageProgress> getSparkStageProgress(); + Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException; SparkCounters getCounter(); Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java?rev=1653344&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java Tue Jan 20 19:24:52 2015 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.spark.api.java.JavaSparkContext; + +public class LocalSparkJobRef implements SparkJobRef { + + private final String jobId; + private final SparkJobStatus sparkJobStatus; + private final JavaSparkContext javaSparkContext; + + public LocalSparkJobRef(String jobId, SparkJobStatus sparkJobStatus, JavaSparkContext javaSparkContext) { + this.jobId = jobId; + this.sparkJobStatus = sparkJobStatus; + this.javaSparkContext = javaSparkContext; + } + + @Override + public String getJobId() { + return jobId; + } + + @Override + public SparkJobStatus getSparkJobStatus() { + return sparkJobStatus; + } + + @Override + public boolean cancelJob() { + int id = Integer.parseInt(jobId); + javaSparkContext.sc().cancelJob(id); + return true; + } +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java?rev=1653344&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java Tue Jan 20 19:24:52 2015 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hive.spark.client.JobHandle; + +import java.io.Serializable; + +public class RemoteSparkJobRef implements SparkJobRef { + + private final String jobId; + private final SparkJobStatus sparkJobStatus; + private final JobHandle<Serializable> jobHandler; + + public RemoteSparkJobRef(JobHandle<Serializable> jobHandler, SparkJobStatus sparkJobStatus) { + this.jobHandler = jobHandler; + this.jobId = jobHandler.getClientJobId(); + this.sparkJobStatus = sparkJobStatus; + } + + @Override + public String getJobId() { + return jobId; + } + + @Override + public SparkJobStatus getSparkJobStatus() { + return sparkJobStatus; + } + + @Override + public boolean cancelJob() { + return jobHandler.cancel(true); + } +} Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1653344&r1=1653343&r2=1653344&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Tue Jan 20 19:24:52 2015 @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.spark.client.MetricsCollection; import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; @@ -50,11 +51,6 @@ import java.util.concurrent.TimeUnit; */ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); - // time (in milliseconds) to wait for a spark job to be submitted on remote cluster - // after this period, we decide the job submission has failed so that client won't hang forever - private static final long WAIT_SUBMISSION_TIMEOUT = 30000; - // remember when the monitor starts - private final long startTime; private final SparkClient sparkClient; private final JobHandle<Serializable> jobHandle; private final transient long sparkClientTimeoutInSeconds; @@ -63,7 +59,6 @@ public class RemoteSparkJobStatus implem this.sparkClient = sparkClient; this.jobHandle = jobHandle; this.sparkClientTimeoutInSeconds = timeoutInSeconds; - startTime = System.nanoTime(); } @Override @@ -72,19 +67,19 @@ public class RemoteSparkJobStatus implem } @Override - public JobExecutionStatus getState() { + public JobExecutionStatus getState() throws HiveException { SparkJobInfo sparkJobInfo = getSparkJobInfo(); return sparkJobInfo != null ? sparkJobInfo.status() : null; } @Override - public int[] getStageIds() { + public int[] getStageIds() throws HiveException { SparkJobInfo sparkJobInfo = getSparkJobInfo(); return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[0]; } @Override - public Map<String, SparkStageProgress> getSparkStageProgress() { + public Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException { Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>(); for (int stageId : getStageIds()) { SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); @@ -132,27 +127,19 @@ public class RemoteSparkJobStatus implem } - private SparkJobInfo getSparkJobInfo() { + private SparkJobInfo getSparkJobInfo() throws HiveException { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; if (sparkJobId == null) { - long duration = TimeUnit.MILLISECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - if (duration <= WAIT_SUBMISSION_TIMEOUT) { - return null; - } else { - LOG.info("Job hasn't been submitted after " + duration / 1000 + "s. Aborting it."); - jobHandle.cancel(false); - return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED); - } + return null; } Future<SparkJobInfo> getJobInfo = sparkClient.run( new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); try { return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn("Error getting job info", t); - return null; + } catch (Exception e) { + LOG.warn("Failed to get job info.", e); + throw new HiveException(e); } }