Repository: hive Updated Branches: refs/heads/master dd2bdfc67 -> 0e54991d8
HIVE-11581: HiveServer2 should store connection params in ZK when using dynamic service discovery for simpler client connection string (Vaibhav Gumashta reviewed by Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e54991d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e54991d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e54991d Branch: refs/heads/master Commit: 0e54991d897c9acc26b015b6df82b44c0c90c6fb Parents: dd2bdfc Author: Vaibhav Gumashta <vgumas...@apache.org> Authored: Mon Aug 24 17:14:27 2015 -0700 Committer: Vaibhav Gumashta <vgumas...@apache.org> Committed: Mon Aug 24 17:15:22 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 72 ++++++------ .../org/apache/hive/jdbc/HiveConnection.java | 4 +- jdbc/src/java/org/apache/hive/jdbc/Utils.java | 117 +++++++------------ .../hive/jdbc/ZooKeeperHiveClientHelper.java | 104 ++++++++++++++--- .../apache/hive/service/server/HiveServer2.java | 74 +++++++++++- 5 files changed, 239 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da171b1..8706a2d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1683,22 +1683,6 @@ public class HiveConf extends Configuration { "to construct a list exception handlers to handle exceptions thrown\n" + "by record readers"), - // operation log configuration - HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, - "When true, HS2 will save operation logs and make them available for clients"), - HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location", - "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator + - "operation_logs", - "Top level directory where operation logs are stored if logging functionality is enabled"), - HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION", - new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"), - "HS2 operation logging mode available to clients to be set at session level.\n" + - "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" + - " NONE: Ignore any logging\n" + - " EXECUTION: Log completion of tasks\n" + - " PERFORMANCE: Execution + Performance logs \n" + - " VERBOSE: All logs" ), - HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), // logging configuration HIVE_LOG4J_FILE("hive.log4j.file", "", "Hive log4j configuration file.\n" + @@ -1790,6 +1774,7 @@ public class HiveConf extends Configuration { "hive.zookeeper.quorum in their connection string."), HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2", "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."), + // HiveServer2 global init file location HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", "Either the location of a HS2 global init file or a directory containing a .hiverc file. If the \n" + @@ -1801,6 +1786,39 @@ public class HiveConf extends Configuration { HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + "enable parallel compilation between sessions on HiveServer2. The default is false."), + // Tez session settings + HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "", + "A list of comma separated values corresponding to YARN queues of the same name.\n" + + "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" + + "for multiple Tez sessions to run in parallel on the cluster."), + HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1, + "A positive integer that determines the number of Tez sessions that should be\n" + + "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" + + "Determines the parallelism on each queue."), + HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false, + "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" + + "turning on Tez for HiveServer2. The user could potentially want to run queries\n" + + "over Tez without the pool of sessions."), + + // Operation log configuration + HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, + "When true, HS2 will save operation logs and make them available for clients"), + HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION("hive.server2.logging.operation.log.location", + "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator + + "operation_logs", + "Top level directory where operation logs are stored if logging functionality is enabled"), + HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION", + new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"), + "HS2 operation logging mode available to clients to be set at session level.\n" + + "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" + + " NONE: Ignore any logging\n" + + " EXECUTION: Log completion of tasks\n" + + " PERFORMANCE: Execution + Performance logs \n" + + " VERBOSE: All logs" ), + + // Enable metric collection for HiveServer2 + HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), + // http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."), @@ -1816,7 +1834,7 @@ public class HiveConf extends Configuration { "Keepalive time for an idle http worker thread. When the number of workers exceeds min workers, " + "excessive threads are killed after this time interval."), - // Cookie based authentication + // Cookie based authentication when using HTTP Transport HIVE_SERVER2_THRIFT_HTTP_COOKIE_AUTH_ENABLED("hive.server2.thrift.http.cookie.auth.enabled", true, "When true, HiveServer2 in HTTP transport mode, will use cookie based authentication mechanism."), HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE("hive.server2.thrift.http.cookie.max.age", "86400s", @@ -1963,6 +1981,8 @@ public class HiveConf extends Configuration { " HIVE : Exposes Hive's native table types like MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW\n" + " CLASSIC : More generic types like TABLE and VIEW"), HIVE_SERVER2_SESSION_HOOK("hive.server2.session.hook", "", ""), + + // SSL settings HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false, "Set this to true for using SSL encryption in HiveServer2."), HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", @@ -1983,9 +2003,6 @@ public class HiveConf extends Configuration { "Comma separated list of udfs names. These udfs will not be allowed in queries." + " The udf black list takes precedence over udf white list"), - HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", - "Comma separated list of non-SQL Hive commands users are authorized to execute"), - HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h", new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false), "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."), @@ -2002,6 +2019,8 @@ public class HiveConf extends Configuration { " This setting takes effect only if session idle timeout (hive.server2.idle.session.timeout) and checking\n" + "(hive.server2.session.check.interval) are enabled."), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", + "Comma separated list of non-SQL Hive commands users are authorized to execute"), HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role", "Comma separated list of configuration options which are immutable at runtime"), @@ -2127,19 +2146,6 @@ public class HiveConf extends Configuration { HIVECOUNTERGROUP("hive.counters.group.name", "HIVE", "The name of counter group for internal Hive variables (CREATED_FILE, FATAL_ERROR, etc.)"), - HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "", - "A list of comma separated values corresponding to YARN queues of the same name.\n" + - "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" + - "for multiple Tez sessions to run in parallel on the cluster."), - HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE("hive.server2.tez.sessions.per.default.queue", 1, - "A positive integer that determines the number of Tez sessions that should be\n" + - "launched on each of the queues specified by \"hive.server2.tez.default.queues\".\n" + - "Determines the parallelism on each queue."), - HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS("hive.server2.tez.initialize.default.sessions", false, - "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" + - "turning on Tez for HiveServer2. The user could potentially want to run queries\n" + - "over Tez without the pool of sessions."), - HIVE_QUOTEDID_SUPPORT("hive.support.quoted.identifiers", "column", new StringSet("none", "column"), "Whether to use quoted identifier. 'none' or 'column' can be used. \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index a9dac03..ba971fd 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -211,13 +211,13 @@ public class HiveConnection implements java.sql.Connection { break; } catch (TTransportException e) { LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString); - // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper + // We'll retry till we exhaust all HiveServer2 nodes from ZooKeeper if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null) && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) { try { // Update jdbcUriString, host & port variables in connParams - // Throw an exception if all HiveServer2 uris have been exhausted, + // Throw an exception if all HiveServer2 nodes have been exhausted, // or if we're unable to connect to ZooKeeper. Utils.updateConnParamsFromZooKeeper(connParams); } catch (ZooKeeperHiveClientException ze) { http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/Utils.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 0e4693b..d8368a4 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -19,7 +19,6 @@ package org.apache.hive.jdbc; import java.net.URI; -import java.net.URISyntaxException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -37,22 +36,22 @@ import org.apache.hive.service.cli.thrift.TStatusCode; import org.apache.http.client.CookieStore; import org.apache.http.cookie.Cookie; -public class Utils { - public static final Log LOG = LogFactory.getLog(Utils.class.getName()); +class Utils { + static final Log LOG = LogFactory.getLog(Utils.class.getName()); /** * The required prefix for the connection URL. */ - public static final String URL_PREFIX = "jdbc:hive2://"; + static final String URL_PREFIX = "jdbc:hive2://"; /** * If host is provided, without a port. */ - public static final String DEFAULT_PORT = "10000"; + static final String DEFAULT_PORT = "10000"; /** * Hive's default database name */ - public static final String DEFAULT_DATABASE = "default"; + static final String DEFAULT_DATABASE = "default"; private static final String URI_JDBC_PREFIX = "jdbc:"; @@ -63,7 +62,7 @@ public class Utils { static final String HIVE_SERVER2_RETRY_TRUE = "true"; static final String HIVE_SERVER2_RETRY_FALSE = "false"; - public static class JdbcConnectionParams { + static class JdbcConnectionParams { // Note on client side parameter naming convention: // Prefer using a shorter camelCase param name instead of using the same name as the // corresponding @@ -129,7 +128,7 @@ public class Utils { static final String SSL_TRUST_STORE_TYPE = "JKS"; private String host = null; - private int port; + private int port = 0; private String jdbcUriString; private String dbName = DEFAULT_DATABASE; private Map<String,String> hiveConfs = new LinkedHashMap<String,String>(); @@ -238,17 +237,17 @@ public class Utils { } // Verify success or success_with_info status, else throw SQLException - public static void verifySuccessWithInfo(TStatus status) throws SQLException { + static void verifySuccessWithInfo(TStatus status) throws SQLException { verifySuccess(status, true); } // Verify success status, else throw SQLException - public static void verifySuccess(TStatus status) throws SQLException { + static void verifySuccess(TStatus status) throws SQLException { verifySuccess(status, false); } // Verify success and optionally with_info status, else throw SQLException - public static void verifySuccess(TStatus status, boolean withInfo) throws SQLException { + static void verifySuccess(TStatus status, boolean withInfo) throws SQLException { if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS || (withInfo && status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS)) { return; @@ -279,7 +278,7 @@ public class Utils { * @return * @throws SQLException */ - public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException, + static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException, SQLException, ZooKeeperHiveClientException { JdbcConnectionParams connParams = new JdbcConnectionParams(); @@ -383,7 +382,6 @@ public class Utils { newUsage = usageUrlBase + JdbcConnectionParams.HTTP_PATH + "=<http_path_value>"; handleParamDeprecation(connParams.getHiveConfs(), connParams.getSessionVars(), JdbcConnectionParams.HTTP_PATH_DEPRECATED, JdbcConnectionParams.HTTP_PATH, newUsage); - // Extract host, port if (connParams.isEmbeddedMode()) { // In case of embedded mode we were supplied with an empty authority. @@ -391,23 +389,15 @@ public class Utils { connParams.setHost(jdbcURI.getHost()); connParams.setPort(jdbcURI.getPort()); } else { - // Else substitute the dummy authority with a resolved one. - // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper - String resolvedAuthorityString = resolveAuthority(connParams); - LOG.info("Resolved authority: " + resolvedAuthorityString); - uri = uri.replace(dummyAuthorityString, resolvedAuthorityString); + // Configure host, port and params from ZooKeeper if used, + // and substitute the dummy authority with a resolved one + configureConnParams(connParams); + // We check for invalid host, port while configuring connParams with configureConnParams() + String authorityStr = connParams.getHost() + ":" + connParams.getPort(); + LOG.info("Resolved authority: " + authorityStr); + uri = uri.replace(dummyAuthorityString, authorityStr); connParams.setJdbcUriString(uri); - // Create a Java URI from the resolved URI for extracting the host/port - URI resolvedAuthorityURI = null; - try { - resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null); - } catch (URISyntaxException e) { - throw new JdbcUriParseException("Bad URL format: ", e); - } - connParams.setHost(resolvedAuthorityURI.getHost()); - connParams.setPort(resolvedAuthorityURI.getPort()); } - return connParams; } @@ -471,22 +461,17 @@ public class Utils { return authorities; } - /** - * Get a string representing a specific host:port - * @param connParams - * @return - * @throws JdbcUriParseException - * @throws ZooKeeperHiveClientException - */ - private static String resolveAuthority(JdbcConnectionParams connParams) + private static void configureConnParams(JdbcConnectionParams connParams) throws JdbcUriParseException, ZooKeeperHiveClientException { String serviceDiscoveryMode = connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE); if ((serviceDiscoveryMode != null) && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER .equalsIgnoreCase(serviceDiscoveryMode))) { - // Resolve using ZooKeeper - return resolveAuthorityUsingZooKeeper(connParams); + // Set ZooKeeper ensemble in connParams for later use + connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ",")); + // Configure using ZooKeeper + ZooKeeperHiveClientHelper.configureConnParams(connParams); } else { String authority = connParams.getAuthorityList()[0]; URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority); @@ -494,32 +479,28 @@ public class Utils { // to separate the 'path' portion of URI can result in this. // The missing "/" common typo while using secure mode, eg of such url - // jdbc:hive2://localhost:10000;principal=hive/hiveserver2h...@your-realm.com - if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) { - throw new JdbcUriParseException("Bad URL format. Hostname not found " - + " in authority part of the url: " + jdbcURI.getAuthority() - + ". Are you missing a '/' after the hostname ?"); + if (jdbcURI.getAuthority() != null) { + String host = jdbcURI.getHost(); + int port = jdbcURI.getPort(); + if (host == null) { + throw new JdbcUriParseException("Bad URL format. Hostname not found " + + " in authority part of the url: " + jdbcURI.getAuthority() + + ". Are you missing a '/' after the hostname ?"); + } + // Set the port to default value; we do support jdbc url like: + // jdbc:hive2://localhost/db + if (port <= 0) { + port = Integer.parseInt(Utils.DEFAULT_PORT); + } + connParams.setHost(jdbcURI.getHost()); + connParams.setPort(jdbcURI.getPort()); } - // Return the 1st element of the array - return jdbcURI.getAuthority(); } } /** - * Read a specific host:port from ZooKeeper - * @param connParams - * @return - * @throws ZooKeeperHiveClientException - */ - private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams) - throws ZooKeeperHiveClientException { - // Set ZooKeeper ensemble in connParams for later use - connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ",")); - return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams); - } - - /** * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already - * explored. Also update the host, port, jdbcUriString fields of connParams. + * explored. Also update the host, port, jdbcUriString and other configs published by the server. * * @param connParams * @throws ZooKeeperHiveClientException @@ -528,25 +509,13 @@ public class Utils { throws ZooKeeperHiveClientException { // Add current host to the rejected list connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath()); - // Get another HiveServer2 uri from ZooKeeper - String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams); - // Parse serverUri to a java URI and extract host, port - URI serverUri = null; - try { - // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor - // to construct a valid URI - serverUri = new URI(null, serverUriString, null, null, null); - } catch (URISyntaxException e) { - throw new ZooKeeperHiveClientException(e); - } String oldServerHost = connParams.getHost(); int oldServerPort = connParams.getPort(); - String newServerHost = serverUri.getHost(); - int newServerPort = serverUri.getPort(); - connParams.setHost(newServerHost); - connParams.setPort(newServerPort); + // Update connection params (including host, port) from ZooKeeper + ZooKeeperHiveClientHelper.configureConnParams(connParams); connParams.setJdbcUriString(connParams.getJdbcUriString().replace( - oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort)); + oldServerHost + ":" + oldServerPort, connParams.getHost() + ":" + connParams.getPort())); + LOG.info("Selected HiveServer2 instance with uri: " + connParams.getJdbcUriString()); } private static String joinStringArray(String[] stringArray, String seperator) { http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index e24b3dc..eeb3cf9 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -19,9 +19,10 @@ package org.apache.hive.jdbc; import java.nio.charset.Charset; -import java.sql.SQLException; import java.util.List; import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,26 +32,19 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.zookeeper.Watcher; -public class ZooKeeperHiveClientHelper { - public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName()); - +class ZooKeeperHiveClientHelper { + static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName()); + // Pattern for key1=value1;key2=value2 + private static final Pattern kvPattern = Pattern.compile("([^=;]*)=([^;]*)[;]?"); /** * A no-op watcher class */ - public static class DummyWatcher implements Watcher { + static class DummyWatcher implements Watcher { public void process(org.apache.zookeeper.WatchedEvent event) { } } - /** - * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly. - * - * @param uri - * @param connParams - * @return - * @throws SQLException - */ - static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) + static void configureConnParams(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException { String zooKeeperEnsemble = connParams.getZooKeeperEnsemble(); String zooKeeperNamespace = @@ -73,17 +67,17 @@ public class ZooKeeperHiveClientHelper { throw new ZooKeeperHiveClientException( "Tried all existing HiveServer2 uris from ZooKeeper."); } - // Now pick a host randomly + // Now pick a server node randomly serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); connParams.setCurrentHostZnodePath(serverNode); - String serverUri = + // Read config string from the znode for this server node + String serverConfStr = new String( zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode), Charset.forName("UTF-8")); - LOG.info("Selected HiveServer2 instance with uri: " + serverUri); - return serverUri; + applyConfs(serverConfStr, connParams); } catch (Exception e) { - throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e); + throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e); } finally { // Close the client connection with ZooKeeper if (zooKeeperClient != null) { @@ -91,4 +85,76 @@ public class ZooKeeperHiveClientHelper { } } } + + /** + * Apply configs published by the server. Configs specified from client's JDBC URI override + * configs published by the server. + * + * @param serverConfStr + * @param connParams + * @throws Exception + */ + private static void applyConfs(String serverConfStr, JdbcConnectionParams connParams) + throws Exception { + Matcher matcher = kvPattern.matcher(serverConfStr); + while (matcher.find()) { + // Have to use this if-else since switch-case on String is supported Java 7 onwards + if ((matcher.group(1) != null)) { + if ((matcher.group(2) == null)) { + throw new Exception("Null config value for: " + matcher.group(1) + + " published by the server."); + } + // Set host + if ((matcher.group(1).equals("hive.server2.thrift.bind.host")) + && (connParams.getHost() == null)) { + connParams.setHost(matcher.group(2)); + } + // Set transportMode + if ((matcher.group(1).equals("hive.server2.transport.mode")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.TRANSPORT_MODE))) { + connParams.getSessionVars().put(JdbcConnectionParams.TRANSPORT_MODE, matcher.group(2)); + } + // Set port + if ((matcher.group(1).equals("hive.server2.thrift.port")) && !(connParams.getPort() > 0)) { + connParams.setPort(Integer.parseInt(matcher.group(2))); + } + if ((matcher.group(1).equals("hive.server2.thrift.http.port")) + && !(connParams.getPort() > 0)) { + connParams.setPort(Integer.parseInt(matcher.group(2))); + } + // Set sasl qop + if ((matcher.group(1).equals("hive.server2.thrift.sasl.qop")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_QOP))) { + connParams.getSessionVars().put(JdbcConnectionParams.AUTH_QOP, matcher.group(2)); + } + // Set http path + if ((matcher.group(1).equals("hive.server2.thrift.http.path")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.HTTP_PATH))) { + connParams.getSessionVars().put(JdbcConnectionParams.HTTP_PATH, "/" + matcher.group(2)); + } + // Set SSL + if ((matcher.group(1) != null) && (matcher.group(1).equals("hive.server2.use.SSL")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.USE_SSL))) { + connParams.getSessionVars().put(JdbcConnectionParams.USE_SSL, matcher.group(2)); + } + // Set authentication configs + // Note that in JDBC driver, we have 3 auth modes: NOSASL, Kerberos and password based + // The use of "JdbcConnectionParams.AUTH_TYPE=JdbcConnectionParams.AUTH_SIMPLE" picks NOSASL + // The presence of "JdbcConnectionParams.AUTH_PRINCIPAL=<principal>" picks Kerberos + // Otherwise password based (which includes NONE, PAM, LDAP, CUSTOM) + if ((matcher.group(1).equals("hive.server2.authentication")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_TYPE))) { + if (matcher.group(2).equalsIgnoreCase("NOSASL")) { + connParams.getSessionVars().put(JdbcConnectionParams.AUTH_TYPE, + JdbcConnectionParams.AUTH_SIMPLE); + } + } + // Set server's kerberos principal + if ((matcher.group(1).equals("hive.server2.authentication.kerberos.principal")) + && !(connParams.getSessionVars().containsKey(JdbcConnectionParams.AUTH_PRINCIPAL))) { + connParams.getSessionVars().put(JdbcConnectionParams.AUTH_PRINCIPAL, matcher.group(2)); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0e54991d/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 4a4be97..d7ba964 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -21,7 +21,9 @@ package org.apache.hive.service.server; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -69,6 +71,8 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; +import com.google.common.base.Joiner; + /** * HiveServer2. * @@ -100,7 +104,12 @@ public class HiveServer2 extends CompositeService { } addService(thriftCLIService); super.init(hiveConf); - + // Set host name in hiveConf + try { + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getServerHost()); + } catch (Throwable t) { + throw new Error("Unable to intitialize HiveServer2", t); + } // Add a shutdown hook for catching SIGTERM & SIGINT final HiveServer2 hiveServer2 = this; Runtime.getRuntime().addShutdownHook(new Thread() { @@ -122,6 +131,14 @@ public class HiveServer2 extends CompositeService { return false; } + public static boolean isKerberosAuthMode(HiveConf hiveConf) { + String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION); + if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) { + return true; + } + return false; + } + /** * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory */ @@ -158,9 +175,12 @@ public class HiveServer2 extends CompositeService { private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); - String instanceURI = getServerInstanceURI(hiveConf); - byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); + String instanceURI = getServerInstanceURI(); setUpZooKeeperAuth(hiveConf); + // HiveServer2 configs that this instance will publish to ZooKeeper, + // so that the clients can read these and configure themselves properly. + Map<String, String> confsToPublish = new HashMap<String, String>(); + addConfsToPublish(hiveConf, confsToPublish); int sessionTimeout = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); @@ -193,6 +213,10 @@ public class HiveServer2 extends CompositeService { ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; + String znodeData = ""; + // Publish configs for this instance as the data on the node + znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish); + byte[] znodeDataUTF8 = znodeData.getBytes(Charset.forName("UTF-8")); znode = new PersistentEphemeralNode(zooKeeperClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8); @@ -220,6 +244,41 @@ public class HiveServer2 extends CompositeService { } /** + * Add conf keys, values that HiveServer2 will publish to ZooKeeper. + * @param hiveConf + */ + private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish) { + // Hostname + confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST)); + // Transport mode + confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)); + // Transport specific confs + if (isHTTPTransportMode(hiveConf)) { + confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT)); + confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); + } else { + confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_PORT)); + confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP)); + } + // Auth specific confs + confsToPublish.put(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)); + if (isKerberosAuthMode(hiveConf)) { + confsToPublish.put(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); + } + // SSL conf + confsToPublish.put(ConfVars.HIVE_SERVER2_USE_SSL.varname, + hiveConf.getVar(ConfVars.HIVE_SERVER2_USE_SSL)); + } + + /** * For a kerberized cluster, we dynamically set up the client's JAAS conf. * * @param hiveConf @@ -289,7 +348,7 @@ public class HiveServer2 extends CompositeService { this.registeredWithZooKeeper = registeredWithZooKeeper; } - private String getServerInstanceURI(HiveConf hiveConf) throws Exception { + private String getServerInstanceURI() throws Exception { if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) { throw new Exception("Unable to get the server address; it hasn't been initialized yet."); } @@ -297,6 +356,13 @@ public class HiveServer2 extends CompositeService { + thriftCLIService.getPortNumber(); } + private String getServerHost() throws Exception { + if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) { + throw new Exception("Unable to get the server address; it hasn't been initialized yet."); + } + return thriftCLIService.getServerIPAddress().getHostName(); + } + @Override public synchronized void start() { super.start();