Author: xuefu
Date: Tue Jan 20 19:24:52 2015
New Revision: 1653344

URL: http://svn.apache.org/r1653344
Log:
HIVE-9395: Make WAIT_SUBMISSION_TIMEOUT configuable and check timeout in 
SparkJobMonitor level.[Spark Branch] (Chengxiang via Xuefu)

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
Modified:
    
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Tue Jan 20 19:24:52 2015
@@ -1986,7 +1986,12 @@ public class HiveConf extends Configurat
         "hive.spark.client.future.timeout",
         "60s",
         new TimeValidator(TimeUnit.SECONDS),
-        "remote spark client JobHandle future timeout value in seconds.")
+        "Remote Spark client JobHandle future timeout value in seconds."),
+    SPARK_JOB_MONITOR_TIMEOUT(
+        "hive.spark.job.monitor.timeout",
+        "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Spark job monitor timeout if could not get job state in specified 
time interval.")
     ;
 
     public final String varname;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
 Tue Jan 20 19:24:52 2015
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -136,7 +137,7 @@ public class LocalHiveSparkClient implem
     int jobId = future.jobIds().get(0);
     LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus(
       sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), 
future);
-    return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
+    return new LocalSparkJobRef(Integer.toString(jobId), sparkJobStatus, sc);
   }
 
   /**

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
 Tue Jan 20 19:24:52 2015
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -122,7 +123,7 @@ public class RemoteHiveSparkClient imple
 
     JobHandle<Serializable> jobHandle = remoteClient.submit(
         new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
-    return new SparkJobRef(jobHandle.getClientJobId(), new 
RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
+    return new RemoteSparkJobRef(jobHandle, new 
RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
   }
 
   private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 Tue Jan 20 19:24:52 2015
@@ -104,14 +104,18 @@ public class SparkTask extends Task<Spar
 
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
       if (sparkJobStatus != null) {
-        SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+        SparkJobMonitor monitor = new SparkJobMonitor(conf, sparkJobStatus);
         rc = monitor.startMonitor();
-        // for RSC, we should get the counters after job has finished
         sparkCounters = sparkJobStatus.getCounter();
-        SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
-        if (LOG.isInfoEnabled() && sparkStatistics != null) {
-          LOG.info(String.format("=====Spark Job[%s] statistics=====", 
jobRef.getJobId()));
-          logSparkStatistic(sparkStatistics);
+        if (rc == 0 ) {
+          // for RSC, we should get the counters after job has finished
+          SparkStatistics sparkStatistics = 
sparkJobStatus.getSparkStatistics();
+          if (LOG.isInfoEnabled() && sparkStatistics != null) {
+            LOG.info(String.format("=====Spark Job[%s] statistics=====", 
jobRef.getJobId()));
+            logSparkStatistic(sparkStatistics);
+          }
+        } else if (rc == 2) { // Cancel job if the monitor found job 
submission timeout.
+          jobRef.cancelJob();
         }
         sparkJobStatus.cleanup();
       }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
 Tue Jan 20 19:24:52 2015
@@ -24,9 +24,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -43,15 +45,19 @@ public class SparkJobMonitor {
 
   private transient LogHelper console;
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+  private final long monitorTimeoutInteval;
   private final int checkInterval = 1000;
   private final int printInterval = 3000;
+  private final HiveConf hiveConf;
   private long lastPrintTime;
   private Set<String> completed;
 
   private SparkJobStatus sparkJobStatus;
 
-  public SparkJobMonitor(SparkJobStatus sparkJobStatus) {
+  public SparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) {
     this.sparkJobStatus = sparkJobStatus;
+    this.hiveConf = hiveConf;
+    monitorTimeoutInteval = 
hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, 
TimeUnit.SECONDS);
     console = new LogHelper(LOG);
   }
 
@@ -63,19 +69,30 @@ public class SparkJobMonitor {
     int rc = 0;
     JobExecutionStatus lastState = null;
     Map<String, SparkStageProgress> lastProgressMap = null;
-    long startTime = -1;
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
 
+    long startTime = System.currentTimeMillis();
+
     while (true) {
-      JobExecutionStatus state = sparkJobStatus.getState();
       try {
+        JobExecutionStatus state = sparkJobStatus.getState();
         if (LOG.isDebugEnabled()) {
           console.printInfo("state = " + state);
         }
-        if (state != null && state != JobExecutionStatus.UNKNOWN
-          && (state != lastState || state == JobExecutionStatus.RUNNING)) {
+
+        if (state == null) {
+          long timeCount = (System.currentTimeMillis() - startTime)/1000;
+          if (timeCount > monitorTimeoutInteval) {
+            LOG.info("Job hasn't been submitted after " + timeCount + "s. 
Aborting it.");
+            console.printError("Status: " + state);
+            running = false;
+            done = true;
+            rc = 2;
+            break;
+          }
+        } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
           lastState = state;
           Map<String, SparkStageProgress> progressMap = 
sparkJobStatus.getSparkStageProgress();
 
@@ -92,27 +109,21 @@ public class SparkJobMonitor {
 
               console.printInfo("\nStatus: Running (Hive on Spark job["
                 + sparkJobStatus.getJobId() + "])");
-              startTime = System.currentTimeMillis();
               running = true;
 
               console.printInfo("Job Progress Format\nCurrentTime 
StageId_StageAttemptId: "
                 + 
"SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount 
[StageCost]");
             }
 
-
             printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
             break;
           case SUCCEEDED:
             printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
-            if (startTime < 0) {
-              console.printInfo("Status: Finished successfully within a check 
interval.");
-            } else {
-              double duration = (System.currentTimeMillis() - startTime) / 
1000.0;
-              console.printInfo("Status: Finished successfully in "
-                + String.format("%.2f seconds", duration));
-            }
+            double duration = (System.currentTimeMillis() - startTime) / 
1000.0;
+            console.printInfo("Status: Finished successfully in "
+              + String.format("%.2f seconds", duration));
             running = false;
             done = true;
             break;
@@ -120,13 +131,13 @@ public class SparkJobMonitor {
             console.printError("Status: Failed");
             running = false;
             done = true;
-            rc = 2;
+            rc = 3;
             break;
           case UNKNOWN:
             console.printError("Status: Unknown");
             running = false;
             done = true;
-            rc = 2;
+            rc = 4;
             break;
           }
         }
@@ -135,17 +146,14 @@ public class SparkJobMonitor {
         }
       } catch (Exception e) {
         String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
-        if (state == null || state.equals(JobExecutionStatus.UNKNOWN)) {
-          msg = "Job Submission failed" + msg;
-        } else {
-          msg = "Ended Job = " + sparkJobStatus.getJobId() + msg;
-        }
+        msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + 
msg;
 
         // Has to use full name to make sure it does not conflict with
         // org.apache.commons.lang.StringUtils
         LOG.error(msg, e);
         console.printError(msg, "\n" + 
org.apache.hadoop.util.StringUtils.stringifyException(e));
         rc = 1;
+        done = true;
       } finally {
         if (done) {
           break;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
 Tue Jan 20 19:24:52 2015
@@ -17,36 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
-public class SparkJobRef {
+public interface SparkJobRef {
 
-  private String jobId;
+  public String getJobId();
 
-  private SparkJobStatus sparkJobStatus;
+  public SparkJobStatus getSparkJobStatus();
 
-  public SparkJobRef() { }
+  public boolean cancelJob();
 
-  public SparkJobRef(String jobId) {
-    this.jobId = jobId;
-  }
-
-  public SparkJobRef(String jobId, SparkJobStatus sparkJobStatus) {
-    this.jobId = jobId;
-    this.sparkJobStatus = sparkJobStatus;
-  }
-
-  public String getJobId() {
-    return jobId;
-  }
-
-  public void setJobId(String jobId) {
-    this.jobId = jobId;
-  }
-
-  public SparkJobStatus getSparkJobStatus() {
-    return sparkJobStatus;
-  }
-
-  public void setSparkJobStatus(SparkJobStatus sparkJobStatus) {
-    this.sparkJobStatus = sparkJobStatus;
-  }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
 Tue Jan 20 19:24:52 2015
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.JobExecutionStatus;
 
@@ -30,11 +31,11 @@ public interface SparkJobStatus {
 
   int getJobId();
 
-  JobExecutionStatus getState();
+  JobExecutionStatus getState() throws HiveException;
 
-  int[] getStageIds();
+  int[] getStageIds() throws HiveException;
 
-  Map<String, SparkStageProgress> getSparkStageProgress();
+  Map<String, SparkStageProgress> getSparkStageProgress() throws HiveException;
 
   SparkCounters getCounter();
 

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java?rev=1653344&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
 Tue Jan 20 19:24:52 2015
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class LocalSparkJobRef implements SparkJobRef {
+
+  private final String jobId;
+  private final SparkJobStatus sparkJobStatus;
+  private final JavaSparkContext javaSparkContext;
+
+  public LocalSparkJobRef(String jobId, SparkJobStatus sparkJobStatus, 
JavaSparkContext javaSparkContext) {
+    this.jobId = jobId;
+    this.sparkJobStatus = sparkJobStatus;
+    this.javaSparkContext = javaSparkContext;
+  }
+
+  @Override
+  public String getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public SparkJobStatus getSparkJobStatus() {
+    return sparkJobStatus;
+  }
+
+  @Override
+  public boolean cancelJob() {
+    int id = Integer.parseInt(jobId);
+    javaSparkContext.sc().cancelJob(id);
+    return true;
+  }
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java?rev=1653344&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
 Tue Jan 20 19:24:52 2015
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hive.spark.client.JobHandle;
+
+import java.io.Serializable;
+
+public class RemoteSparkJobRef implements SparkJobRef {
+
+  private final String jobId;
+  private final SparkJobStatus sparkJobStatus;
+  private final JobHandle<Serializable> jobHandler;
+
+  public RemoteSparkJobRef(JobHandle<Serializable> jobHandler, SparkJobStatus 
sparkJobStatus) {
+    this.jobHandler = jobHandler;
+    this.jobId = jobHandler.getClientJobId();
+    this.sparkJobStatus = sparkJobStatus;
+  }
+
+  @Override
+  public String getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public SparkJobStatus getSparkJobStatus() {
+    return sparkJobStatus;
+  }
+
+  @Override
+  public boolean cancelJob() {
+    return jobHandler.cancel(true);
+  }
+}

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1653344&r1=1653343&r2=1653344&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 Tue Jan 20 19:24:52 2015
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.spark.client.MetricsCollection;
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
@@ -50,11 +51,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class RemoteSparkJobStatus implements SparkJobStatus {
   private static final Log LOG = 
LogFactory.getLog(RemoteSparkJobStatus.class.getName());
-  // time (in milliseconds) to wait for a spark job to be submitted on remote 
cluster
-  // after this period, we decide the job submission has failed so that client 
won't hang forever
-  private static final long WAIT_SUBMISSION_TIMEOUT = 30000;
-  // remember when the monitor starts
-  private final long startTime;
   private final SparkClient sparkClient;
   private final JobHandle<Serializable> jobHandle;
   private final transient long sparkClientTimeoutInSeconds;
@@ -63,7 +59,6 @@ public class RemoteSparkJobStatus implem
     this.sparkClient = sparkClient;
     this.jobHandle = jobHandle;
     this.sparkClientTimeoutInSeconds = timeoutInSeconds;
-    startTime = System.nanoTime();
   }
 
   @Override
@@ -72,19 +67,19 @@ public class RemoteSparkJobStatus implem
   }
 
   @Override
-  public JobExecutionStatus getState() {
+  public JobExecutionStatus getState() throws HiveException {
     SparkJobInfo sparkJobInfo = getSparkJobInfo();
     return sparkJobInfo != null ? sparkJobInfo.status() : null;
   }
 
   @Override
-  public int[] getStageIds() {
+  public int[] getStageIds() throws HiveException {
     SparkJobInfo sparkJobInfo = getSparkJobInfo();
     return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[0];
   }
 
   @Override
-  public Map<String, SparkStageProgress> getSparkStageProgress() {
+  public Map<String, SparkStageProgress> getSparkStageProgress() throws 
HiveException {
     Map<String, SparkStageProgress> stageProgresses = new HashMap<String, 
SparkStageProgress>();
     for (int stageId : getStageIds()) {
       SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
@@ -132,27 +127,19 @@ public class RemoteSparkJobStatus implem
 
   }
 
-  private SparkJobInfo getSparkJobInfo() {
+  private SparkJobInfo getSparkJobInfo() throws HiveException {
     Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
       ? jobHandle.getSparkJobIds().get(0) : null;
     if (sparkJobId == null) {
-      long duration = TimeUnit.MILLISECONDS.convert(
-          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-      if (duration <= WAIT_SUBMISSION_TIMEOUT) {
-        return null;
-      } else {
-        LOG.info("Job hasn't been submitted after " + duration / 1000 + "s. 
Aborting it.");
-        jobHandle.cancel(false);
-        return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED);
-      }
+      return null;
     }
     Future<SparkJobInfo> getJobInfo = sparkClient.run(
         new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
     try {
       return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
-    } catch (Throwable t) {
-      LOG.warn("Error getting job info", t);
-      return null;
+    } catch (Exception e) {
+      LOG.warn("Failed to get job info.", e);
+      throw new HiveException(e);
     }
   }
 


Reply via email to