Repository: hive
Updated Branches:
  refs/heads/master a2d22b44d -> 71d6e1652


HIVE-18677: SparkClientImpl usage of SessionState.LogHelper doesn't respect 
isSilent value (Sahil Takiar, reviewed by Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/71d6e165
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/71d6e165
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/71d6e165

Branch: refs/heads/master
Commit: 71d6e16522e1965a2c28d451b0811ce123022d9d
Parents: a2d22b4
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Wed Feb 14 16:01:18 2018 -0800
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Wed Feb 14 16:01:18 2018 -0800

----------------------------------------------------------------------
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  3 +-
 .../hive/ql/exec/spark/SparkUtilities.java      |  9 ++++++
 .../spark/status/RemoteSparkJobMonitor.java     |  1 +
 .../ql/exec/spark/status/SparkJobStatus.java    |  2 ++
 .../spark/status/impl/LocalSparkJobStatus.java  | 16 ++++++++++
 .../spark/status/impl/RemoteSparkJobStatus.java | 31 ++++++++++++++++++++
 .../apache/hive/spark/client/BaseProtocol.java  | 13 --------
 .../apache/hive/spark/client/RemoteDriver.java  | 13 --------
 .../hive/spark/client/SparkClientFactory.java   |  4 +--
 .../hive/spark/client/SparkClientImpl.java      | 10 +------
 .../hive/spark/client/TestSparkClient.java      |  3 +-
 11 files changed, 64 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 93d44dc..c571d1a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -101,8 +101,7 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
   }
 
   private void createRemoteClient() throws Exception {
-    remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId,
-            SessionState.LogHelper.getInfoStream());
+    remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId);
 
     if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
             (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 943a4ee..913c9d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -304,4 +306,11 @@ public class SparkUtilities {
     }
     return false;
   }
+
+  public static String reverseDNSLookupURL(String url) throws 
UnknownHostException {
+    // Run a reverse DNS lookup on the URL
+    URI uri = URI.create(url);
+    InetAddress address = InetAddress.getByName(uri.getHost());
+    return uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + 
uri.getPort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index fc4e4de..adb65a5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -93,6 +93,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, 
PerfLogger.SPARK_SUBMIT_TO_RUNNING);
               printAppInfo();
+              console.printInfo("Hive on Spark Session Web UI URL: " + 
sparkJobStatus.getWebUIURL());
               // print job stages.
               console.printInfo("\nQuery Hive on Spark job[" + 
sparkJobStatus.getJobId() +
                   "] stages: " + 
Arrays.toString(sparkJobStatus.getStageIds()));

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index c2c8fb4..8474afc 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -43,6 +43,8 @@ public interface SparkJobStatus {
 
   SparkStatistics getSparkStatistics();
 
+  String getWebUIURL();
+
   void cleanup();
 
   Throwable getError();

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 3e84175..8b031e7 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
@@ -152,6 +154,20 @@ public class LocalSparkJobStatus implements SparkJobStatus 
{
   }
 
   @Override
+  public String getWebUIURL() {
+    try {
+      if (sparkContext.sc().uiWebUrl().isDefined()) {
+        return 
SparkUtilities.reverseDNSLookupURL(sparkContext.sc().uiWebUrl().get());
+      } else {
+        return "UNDEFINED";
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to get web UI URL.", e);
+    }
+    return "UNKNOWN";
+  }
+
+  @Override
   public void cleanup() {
     jobMetricsListener.cleanup(jobId);
     if (cachedRDDIds != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index ce07a9f..e950452 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -38,6 +39,8 @@ import org.apache.spark.SparkStageInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 
 import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -138,6 +141,20 @@ public class RemoteSparkJobStatus implements 
SparkJobStatus {
   }
 
   @Override
+  public String getWebUIURL() {
+    Future<String> getWebUIURL = sparkClient.run(new GetWebUIURLJob());
+    try {
+      return getWebUIURL.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.warn("Failed to get web UI URL.", e);
+      if (Thread.interrupted()) {
+        error = e;
+      }
+      return "UNKNOWN";
+    }
+  }
+
+  @Override
   public void cleanup() {
 
   }
@@ -287,4 +304,18 @@ public class RemoteSparkJobStatus implements 
SparkJobStatus {
       return jc.sc().sc().applicationId();
     }
   }
+
+  private static class GetWebUIURLJob implements Job<String> {
+
+    public GetWebUIURLJob() {
+    }
+
+    @Override
+    public String call(JobContext jc) throws Exception {
+      if (jc.sc().sc().uiWebUrl().isDefined()) {
+        return 
SparkUtilities.reverseDNSLookupURL(jc.sc().sc().uiWebUrl().get());
+      }
+      return "UNDEFINED";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 7290809..7ca89ed 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -165,17 +165,4 @@ abstract class BaseProtocol extends RpcDispatcher {
     }
 
   }
-
-  protected static class SparkUIWebURL<T extends Serializable> implements 
Serializable {
-
-    final String UIWebURL;
-
-    SparkUIWebURL(String UIWebURL) {
-      this.UIWebURL = UIWebURL;
-    }
-
-    SparkUIWebURL() {
-      this(null);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index 66cf339..e584cbb 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -171,14 +171,6 @@ public class RemoteDriver {
       throw e;
     }
 
-    if (jc.sc().sc().uiWebUrl().isDefined()) {
-      // Run a reverse DNS lookup on the URL
-      URI uri = URI.create(jc.sc().sc().uiWebUrl().get());
-      InetAddress address = InetAddress.getByName(uri.getHost());
-      this.protocol.sendUIWebURL(uri.getScheme() + "://" + 
address.getCanonicalHostName() + ":" +
-              uri.getPort());
-    }
-
     synchronized (jcLock) {
       for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) {
         it.next().submit();
@@ -280,11 +272,6 @@ public class RemoteDriver {
       clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, 
metrics));
     }
 
-    void sendUIWebURL(String UIWebURL) {
-      LOG.debug("Send UIWebURL({}) to Client.", UIWebURL);
-      clientRpc.call(new SparkUIWebURL(UIWebURL));
-    }
-
     private void handle(ChannelHandlerContext ctx, CancelJob msg) {
       JobWrapper<?> job = activeJobs.get(msg.id);
       if (job == null || !cancelJob(job)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 6dfc1a5..fd9b725 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -82,10 +82,10 @@ public final class SparkClientFactory {
    * @param hiveConf Configuration for Hive, contains hive.* properties.
    */
   public static SparkClient createClient(Map<String, String> sparkConf, 
HiveConf hiveConf,
-                                         String sessionId, PrintStream 
consoleStream)
+                                         String sessionId)
           throws IOException, SparkException {
     Preconditions.checkState(server != null, "initialize() not called.");
-    return new SparkClientImpl(server, sparkConf, hiveConf, sessionId, 
consoleStream);
+    return new SparkClientImpl(server, sparkConf, hiveConf, sessionId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 78317ed..214f74b 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -89,15 +89,13 @@ class SparkClientImpl implements SparkClient {
   private final Map<String, JobHandleImpl<?>> jobs;
   private final Rpc driverRpc;
   private final ClientProtocol protocol;
-  private final PrintStream consoleStream;
   private volatile boolean isAlive;
 
   SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf 
hiveConf,
-                  String sessionid, PrintStream consoleStream) throws 
IOException {
+                  String sessionid) throws IOException {
     this.conf = conf;
     this.hiveConf = hiveConf;
     this.jobs = Maps.newConcurrentMap();
-    this.consoleStream = consoleStream;
 
     String secret = rpcServer.createSecret();
     this.driverThread = startDriver(rpcServer, sessionid, secret);
@@ -623,12 +621,6 @@ class SparkClientImpl implements SparkClient {
         LOG.warn("Received spark job ID: {} for unknown job {}", 
msg.sparkJobId, msg.clientJobId);
       }
     }
-
-    private void handle(ChannelHandlerContext ctx, SparkUIWebURL msg) {
-      String printMsg = "Hive on Spark Session Web UI URL: " + msg.UIWebURL;
-      consoleStream.println(printMsg);
-      LOG.info(printMsg);
-    }
   }
 
   private static class AddJarJob implements Job<Serializable> {

http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index e66caee..fb31c93 100644
--- 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -303,8 +303,7 @@ public class TestSparkClient {
     SparkClient client = null;
     try {
       test.config(conf);
-      client = SparkClientFactory.createClient(conf, HIVECONF, 
UUID.randomUUID().toString(),
-              mock(PrintStream.class));
+      client = SparkClientFactory.createClient(conf, HIVECONF, 
UUID.randomUUID().toString());
       test.call(client);
     } finally {
       if (client != null) {

Reply via email to