Repository: hive
Updated Branches:
  refs/heads/master afb61aebf -> 346c0ce44


HIVE-20506 - HOS times out when cluster is full while Hive-on-MR waits

(Brock Noland reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/346c0ce4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/346c0ce4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/346c0ce4

Branch: refs/heads/master
Commit: 346c0ce44d89ab030400e017f3e8ec6000344a6e
Parents: afb61ae
Author: Brock Noland <br...@phdata.io>
Authored: Mon Sep 10 15:31:22 2018 -0500
Committer: Brock Noland <br...@phdata.io>
Committed: Mon Sep 10 15:31:57 2018 -0500

----------------------------------------------------------------------
 .../spark/session/SparkSessionManagerImpl.java  |   2 +-
 .../hive/spark/client/SparkClientFactory.java   |   4 +-
 .../spark/client/SparkSubmitSparkClient.java    |  27 ++++-
 .../apache/hive/spark/client/rpc/RpcServer.java | 116 ++++++++++++++++++-
 .../hive/spark/client/TestSparkClient.java      |   2 +-
 .../apache/hive/spark/client/rpc/TestRpc.java   |  85 ++++++++++----
 6 files changed, 207 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
index 79a56bd..8dae54d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
@@ -96,7 +96,7 @@ public class SparkSessionManagerImpl implements 
SparkSessionManager {
           startTimeoutThread();
           Map<String, String> sparkConf = 
HiveSparkClientFactory.initiateSparkConf(hiveConf, null);
           try {
-            SparkClientFactory.initialize(sparkConf);
+            SparkClientFactory.initialize(sparkConf, hiveConf);
             inited = true;
           } catch (IOException e) {
             throw new HiveException("Error initializing SparkClientFactory", 
e);

http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 1974e88..54ecdf0 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -47,12 +47,12 @@ public final class SparkClientFactory {
    *
    * @param conf Map containing configuration parameters for the client 
library.
    */
-  public static void initialize(Map<String, String> conf) throws IOException {
+  public static void initialize(Map<String, String> conf, HiveConf hiveConf) 
throws IOException {
     if (server == null) {
       synchronized (serverLock) {
         if (server == null) {
           try {
-            server = new RpcServer(conf);
+            server = new RpcServer(conf, hiveConf);
           } catch (InterruptedException ie) {
             throw Throwables.propagate(ie);
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 7a6e77b..1879829 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -31,6 +31,8 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -51,6 +53,7 @@ class SparkSubmitSparkClient extends AbstractSparkClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkSubmitSparkClient.class);
 
+  private static final Pattern YARN_APPLICATION_ID_REGEX = 
Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
   private static final String SPARK_HOME_KEY = "spark.home";
 
@@ -191,17 +194,37 @@ class SparkSubmitSparkClient extends AbstractSparkClient {
     final Process child = pb.start();
     String threadName = Thread.currentThread().getName();
     final List<String> childErrorLog = Collections.synchronizedList(new 
ArrayList<String>());
+    final List<String> childOutLog = Collections.synchronizedList(new 
ArrayList<String>());
     final LogRedirector.LogSourceCallback callback = () -> isAlive;
 
     LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
-        new LogRedirector(child.getInputStream(), LOG, callback));
+        new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
     LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
         new LogRedirector(child.getErrorStream(), LOG, childErrorLog, 
callback));
 
     runnable = () -> {
       try {
         int exitCode = child.waitFor();
-        if (exitCode != 0) {
+        if (exitCode == 0) {
+          synchronized (childOutLog) {
+            for (String line : childOutLog) {
+              Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+              if (m.find()) {
+                LOG.info("Found application id " + m.group(1));
+                rpcServer.setApplicationId(m.group(1));
+              }
+            }
+          }
+          synchronized (childErrorLog) {
+            for (String line : childErrorLog) {
+              Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+              if (m.find()) {
+                LOG.info("Found application id " + m.group(1));
+                rpcServer.setApplicationId(m.group(1));
+              }
+            }
+          }
+        } else {
           List<String> errorMessages = new ArrayList<>();
           synchronized (childErrorLog) {
             for (String line : childErrorLog) {

http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 0c67ffd..1f6114a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -61,6 +61,11 @@ import io.netty.util.concurrent.ScheduledFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 
 /**
  * An RPC server. The server matches remote clients based on a secret that is 
generated on
@@ -79,9 +84,12 @@ public class RpcServer implements Closeable {
   private final int port;
   private final ConcurrentMap<String, ClientInfo> pendingClients;
   private final RpcConfiguration config;
+  private String applicationId;
+  private final HiveConf hiveConf;
 
-  public RpcServer(Map<String, String> mapConf) throws IOException, 
InterruptedException {
+  public RpcServer(Map<String, String> mapConf, HiveConf hiveConf) throws 
IOException, InterruptedException {
     this.config = new RpcConfiguration(mapConf);
+    this.hiveConf = hiveConf;
     this.group = new NioEventLoopGroup(
         this.config.getRpcThreadCount(),
         new ThreadFactoryBuilder()
@@ -166,14 +174,116 @@ public class RpcServer implements Closeable {
     return registerClient(clientId, secret, serverDispatcher, 
config.getServerConnectTimeoutMs());
   }
 
+  public void setApplicationId(String applicationId) {
+    this.applicationId = applicationId;
+  }
+
+  /**
+   * This function converts an application in form of a String into a 
ApplicationId.
+   *
+   * @param appIDStr The application id in form of a string
+   * @return the application id as an instance of ApplicationId class.
+   */
+  private static ApplicationId getApplicationIDFromString(String appIDStr) {
+    String[] parts = appIDStr.split("_");
+    if (parts.length < 3) {
+      throw new IllegalStateException("the application id found is not valid. 
application id: " + appIDStr);
+    }
+    long timestamp = Long.parseLong(parts[1]);
+    int id = Integer.parseInt(parts[2]);
+    return ApplicationId.newInstance(timestamp, id);
+  }
+
+  public static boolean isApplicationAccepted(HiveConf conf, String 
applicationId) {
+    if (applicationId == null) {
+      return false;
+    }
+    YarnClient yarnClient = null;
+    try {
+      ApplicationId appId = getApplicationIDFromString(applicationId);
+      yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(conf);
+      yarnClient.start();
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      return appReport != null && appReport.getYarnApplicationState() == 
YarnApplicationState.ACCEPTED;
+    } catch (Exception ex) {
+      LOG.error("Failed getting application status for: " + applicationId + ": 
" + ex, ex);
+      return false;
+    } finally {
+      if (yarnClient != null) {
+        try {
+          yarnClient.stop();
+        } catch (Exception ex) {
+          LOG.error("Failed to stop yarn client: " + ex, ex);
+        }
+      }
+    }
+  }
+
+  static class YarnApplicationStateFinder {
+    public boolean isApplicationAccepted(HiveConf conf, String applicationId) {
+      if (applicationId == null) {
+        return false;
+      }
+      YarnClient yarnClient = null;
+      try {
+        LOG.info("Trying to find " + applicationId);
+        ApplicationId appId = getApplicationIDFromString(applicationId);
+        yarnClient = YarnClient.createYarnClient();
+        yarnClient.init(conf);
+        yarnClient.start();
+        ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+        return appReport != null && appReport.getYarnApplicationState() == 
YarnApplicationState.ACCEPTED;
+      } catch (Exception ex) {
+        LOG.error("Failed getting application status for: " + applicationId + 
": " + ex, ex);
+        return false;
+      } finally {
+        if (yarnClient != null) {
+          try {
+            yarnClient.stop();
+          } catch (Exception ex) {
+            LOG.error("Failed to stop yarn client: " + ex, ex);
+          }
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   Future<Rpc> registerClient(final String clientId, String secret,
-      RpcDispatcher serverDispatcher, long clientTimeoutMs) {
+      RpcDispatcher serverDispatcher, final long clientTimeoutMs) {
+    return registerClient(clientId, secret, serverDispatcher, clientTimeoutMs, 
new YarnApplicationStateFinder());
+  }
+
+  @VisibleForTesting
+  Future<Rpc> registerClient(final String clientId, String secret,
+      RpcDispatcher serverDispatcher, long clientTimeoutMs,
+      YarnApplicationStateFinder yarnApplicationStateFinder) {
     final Promise<Rpc> promise = group.next().newPromise();
 
     Runnable timeout = new Runnable() {
       @Override
       public void run() {
+        // check to see if application is in ACCEPTED state, if so, don't set 
failure
+        // if applicationId is not null
+        //   do yarn application -status $applicationId
+        //   if state == ACCEPTED
+        //     reschedule timeout runnable
+        //   else
+        //    set failure as below
+        LOG.info("Trying to find " + applicationId);
+        if (yarnApplicationStateFinder.isApplicationAccepted(hiveConf, 
applicationId)) {
+          final ClientInfo client = pendingClients.get(clientId);
+          if (client != null) {
+            LOG.info("Extending timeout for client " + clientId);
+            ScheduledFuture<?> oldTimeoutFuture = client.timeoutFuture;
+            client.timeoutFuture = group.schedule(this,
+                clientTimeoutMs,
+                TimeUnit.MILLISECONDS);
+            oldTimeoutFuture.cancel(true);
+            return;
+          }
+        }
         promise.setFailure(new TimeoutException(
                 String.format("Client '%s' timed out waiting for connection 
from the Remote Spark" +
                         " Driver", clientId)));
@@ -369,7 +479,7 @@ public class RpcServer implements Closeable {
     final Promise<Rpc> promise;
     final String secret;
     final RpcDispatcher dispatcher;
-    final ScheduledFuture<?> timeoutFuture;
+    ScheduledFuture<?> timeoutFuture;
 
     private ClientInfo(String id, Promise<Rpc> promise, String secret, 
RpcDispatcher dispatcher,
         ScheduledFuture<?> timeoutFuture) {

http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index d738003..996b24e 100644
--- 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -339,7 +339,7 @@ public class TestSparkClient {
 
   private void runTest(TestFunction test) throws Exception {
     Map<String, String> conf = createConf();
-    SparkClientFactory.initialize(conf);
+    SparkClientFactory.initialize(conf, HIVECONF);
     SparkClient client = null;
     try {
       test.config(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/346c0ce4/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java 
b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 013bcff..7da33fe 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -19,7 +19,6 @@ package org.apache.hive.spark.client.rpc;
 
 import java.io.Closeable;
 import java.net.InetAddress;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -64,10 +63,12 @@ public class TestRpc {
   private static final Map<String, String> emptyConfig =
       ImmutableMap.of(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, 
"DEBUG");
   private static final int RETRY_ACQUIRE_PORT_COUNT = 10;
+  private HiveConf hiveConf;
 
   @Before
   public void setUp() {
     closeables = Lists.newArrayList();
+    hiveConf = new HiveConf();
   }
 
   @After
@@ -99,7 +100,7 @@ public class TestRpc {
 
   @Test
   public void testClientServer() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    RpcServer server = autoClose(new RpcServer(emptyConfig, hiveConf));
     Rpc[] rpcs = createRpcConnection(server);
     Rpc serverRpc = rpcs[0];
     Rpc client = rpcs[1];
@@ -136,25 +137,25 @@ public class TestRpc {
 
     // Test if rpc_server_address is configured
     config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, 
hostAddress);
-    RpcServer server1 = autoClose(new RpcServer(config));
+    RpcServer server1 = autoClose(new RpcServer(config, hiveConf));
     assertTrue("Host address should match the expected one", 
server1.getAddress() == hostAddress);
 
     // Test if rpc_server_address is not configured but HS2 server host is 
configured
     config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, "");
     config.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, 
hostAddress);
-    RpcServer server2 = autoClose(new RpcServer(config));
+    RpcServer server2 = autoClose(new RpcServer(config, hiveConf));
     assertTrue("Host address should match the expected one", 
server2.getAddress() == hostAddress);
 
     // Test if both are not configured
     config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname, "");
     config.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, "");
-    RpcServer server3 = autoClose(new RpcServer(config));
+    RpcServer server3 = autoClose(new RpcServer(config, hiveConf));
     assertTrue("Host address should match the expected one", 
server3.getAddress() == InetAddress.getLocalHost().getHostName());
   }
 
   @Test
   public void testBadHello() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    RpcServer server = autoClose(new RpcServer(emptyConfig, hiveConf));
 
     Future<Rpc> serverRpcFuture = server.registerClient("client", "newClient",
         new TestDispatcher());
@@ -170,7 +171,7 @@ public class TestRpc {
       // On failure, the SASL handler will throw an exception indicating that 
the SASL
       // negotiation failed.
       assertTrue("Unexpected exception: " + ee.getCause(),
-        ee.getCause() instanceof SaslException);
+          ee.getCause() instanceof SaslException);
     }
 
     serverRpcFuture.cancel(true);
@@ -180,12 +181,12 @@ public class TestRpc {
   public void testServerPort() throws Exception {
     Map<String, String> config = new HashMap<String, String>();
 
-    RpcServer server0 = new RpcServer(config);
+    RpcServer server0 = new RpcServer(config, hiveConf);
     assertTrue("Empty port range should return a random valid port: " + 
server0.getPort(), server0.getPort() >= 0);
     IOUtils.closeQuietly(server0);
 
     config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
"49152-49222,49223,49224-49333");
-    RpcServer server1 = new RpcServer(config);
+    RpcServer server1 = new RpcServer(config, hiveConf);
     assertTrue("Port should be within configured port range:" + 
server1.getPort(), server1.getPort() >= 49152 && server1.getPort() <= 49333);
     IOUtils.closeQuietly(server1);
 
@@ -194,7 +195,7 @@ public class TestRpc {
     for (int i = 0; i < RETRY_ACQUIRE_PORT_COUNT; i++) {
       try {
         config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
String.valueOf(expectedPort));
-        server2 = new RpcServer(config);
+        server2 = new RpcServer(config, hiveConf);
         break;
       } catch (Exception e) {
         LOG.debug("Error while connecting to port " + expectedPort + " 
retrying: " + e.getMessage());
@@ -208,7 +209,7 @@ public class TestRpc {
 
     config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
"49552-49222,49223,49224-49333");
     try {
-      autoClose(new RpcServer(config));
+      autoClose(new RpcServer(config, hiveConf));
       assertTrue("Invalid port range should throw an exception", false); // 
Should not reach here
     } catch(IllegalArgumentException e) {
       assertEquals(
@@ -222,7 +223,7 @@ public class TestRpc {
     for (int i = 0; i < RETRY_ACQUIRE_PORT_COUNT; i++) {
       try {
         config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
String.valueOf(expectedPort) + ",21-23");
-        server3 = new RpcServer(config);
+        server3 = new RpcServer(config, hiveConf);
         break;
       } catch (Exception e) {
         LOG.debug("Error while connecting to port " + expectedPort + " 
retrying");
@@ -236,7 +237,7 @@ public class TestRpc {
 
   @Test
   public void testCloseListener() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    RpcServer server = autoClose(new RpcServer(emptyConfig, hiveConf));
     Rpc[] rpcs = createRpcConnection(server);
     Rpc client = rpcs[1];
 
@@ -255,7 +256,7 @@ public class TestRpc {
 
   @Test
   public void testNotDeserializableRpc() throws Exception {
-    RpcServer server = autoClose(new RpcServer(emptyConfig));
+    RpcServer server = autoClose(new RpcServer(emptyConfig, hiveConf));
     Rpc[] rpcs = createRpcConnection(server);
     Rpc client = rpcs[1];
 
@@ -270,10 +271,10 @@ public class TestRpc {
   @Test
   public void testEncryption() throws Exception {
     Map<String, String> eConf = ImmutableMap.<String,String>builder()
-      .putAll(emptyConfig)
-      .put(RpcConfiguration.RPC_SASL_OPT_PREFIX + "qop", Rpc.SASL_AUTH_CONF)
-      .build();
-    RpcServer server = autoClose(new RpcServer(eConf));
+        .putAll(emptyConfig)
+        .put(RpcConfiguration.RPC_SASL_OPT_PREFIX + "qop", Rpc.SASL_AUTH_CONF)
+        .build();
+    RpcServer server = autoClose(new RpcServer(eConf, hiveConf));
     Rpc[] rpcs = createRpcConnection(server, eConf, null);
     Rpc client = rpcs[1];
 
@@ -288,7 +289,7 @@ public class TestRpc {
     Map<String, String> conf = ImmutableMap.<String,String>builder()
       .putAll(emptyConfig)
       .build();
-    RpcServer server = autoClose(new RpcServer(conf));
+    RpcServer server = autoClose(new RpcServer(conf, hiveConf));
     String secret = server.createSecret();
 
     try {
@@ -310,9 +311,53 @@ public class TestRpc {
     }
   }
 
+  static class MockYarnApplicationStateFinder extends 
RpcServer.YarnApplicationStateFinder {
+    private int count = 0;
+    public boolean isApplicationAccepted(HiveConf conf, String applicationId) {
+      return count++ < 10;
+    }
+  }
+
+
+  /**
+   * Tests that we don't timeout with a short timeout but the spark 
application isn't running.
+   */
+  @Test
+  public void testExtendClientTimeout() throws Exception {
+    Map<String, String> conf = ImmutableMap.<String, String>builder()
+        .putAll(emptyConfig)
+        .build();
+    RpcServer server = autoClose(new RpcServer(conf, hiveConf));
+    String secret = server.createSecret();
+    MockYarnApplicationStateFinder yarnApplicationStateFinder = new 
MockYarnApplicationStateFinder();
+    Future<Rpc> promise = server.registerClient("client", secret, new 
TestDispatcher(), 2L,
+        yarnApplicationStateFinder);
+    assertFalse(promise.isDone());
+    Thread.sleep(50);
+    try {
+      promise.get();
+      fail("Server should have timed out client.");
+    } catch (ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof TimeoutException);
+    }
+
+    NioEventLoopGroup eloop = new NioEventLoopGroup();
+    Future<Rpc> clientRpcFuture = Rpc.createClient(conf, eloop,
+        "localhost", server.getPort(), "client", secret, new TestDispatcher());
+    try {
+      autoClose(clientRpcFuture.get());
+      fail("Client should have failed to connect to server.");
+    } catch (ExecutionException ee) {
+      // Error should not be a timeout.
+      assertFalse(ee.getCause() instanceof TimeoutException);
+    }
+  }
+
+
+
   @Test
   public void testRpcServerMultiThread() throws Exception {
-    final RpcServer server = autoClose(new RpcServer(emptyConfig));
+    final RpcServer server = autoClose(new RpcServer(emptyConfig, hiveConf));
     final String msg = "Hello World!";
     Callable<String> callable = () -> {
       Rpc[] rpcs = createRpcConnection(server, emptyConfig, 
UUID.randomUUID().toString());

Reply via email to