Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java Fri Nov 7 20:41:34 2014 @@ -18,16 +18,18 @@ package org.apache.hive.service.auth; -import java.io.IOException; +import java.security.AccessControlContext; +import java.security.AccessController; import java.security.PrivilegedExceptionAction; +import javax.security.auth.Subject; + import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpContext; import org.ietf.jgss.GSSContext; -import org.ietf.jgss.GSSCredential; import org.ietf.jgss.GSSManager; import org.ietf.jgss.GSSName; import org.ietf.jgss.Oid; @@ -36,60 +38,54 @@ import org.ietf.jgss.Oid; * Utility functions for HTTP mode authentication. */ public final class HttpAuthUtils { - public static final String WWW_AUTHENTICATE = "WWW-Authenticate"; public static final String AUTHORIZATION = "Authorization"; public static final String BASIC = "Basic"; public static final String NEGOTIATE = "Negotiate"; - - /** - * @return Stringified Base64 encoded kerberosAuthHeader on success - */ - public static String getKerberosServiceTicket(String principal, String host, String serverHttpUrl) - throws IOException, InterruptedException { - UserGroupInformation clientUGI = getClientUGI("kerberos"); - String serverPrincipal = getServerPrincipal(principal, host); - // Uses the Ticket Granting Ticket in the UserGroupInformation - return clientUGI.doAs( - new HttpKerberosClientAction(serverPrincipal, clientUGI.getUserName(), serverHttpUrl)); - } /** - * Get server principal and verify that hostname is present. - */ - private static String getServerPrincipal(String principal, String host) throws IOException { - return ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host); - } - - /** - * JAAS login to setup the client UserGroupInformation. - * Sets up the Kerberos Ticket Granting Ticket, - * in the client UserGroupInformation object. - * - * @return Client's UserGroupInformation + * @return Stringified Base64 encoded kerberosAuthHeader on success + * @throws Exception */ - public static UserGroupInformation getClientUGI(String authType) throws IOException { - return ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf(authType); + public static String getKerberosServiceTicket(String principal, String host, + String serverHttpUrl, boolean assumeSubject) throws Exception { + String serverPrincipal = + ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host); + if (assumeSubject) { + // With this option, we're assuming that the external application, + // using the JDBC driver has done a JAAS kerberos login already + AccessControlContext context = AccessController.getContext(); + Subject subject = Subject.getSubject(context); + if (subject == null) { + throw new Exception("The Subject is not set"); + } + return Subject.doAs(subject, new HttpKerberosClientAction(serverPrincipal, serverHttpUrl)); + } else { + // JAAS login from ticket cache to setup the client UserGroupInformation + UserGroupInformation clientUGI = + ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos"); + return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl)); + } } private HttpAuthUtils() { throw new UnsupportedOperationException("Can't initialize class"); } + /** + * We'll create an instance of this class within a doAs block so that the client's TGT credentials + * can be read from the Subject + */ public static class HttpKerberosClientAction implements PrivilegedExceptionAction<String> { - public static final String HTTP_RESPONSE = "HTTP_RESPONSE"; public static final String SERVER_HTTP_URL = "SERVER_HTTP_URL"; private final String serverPrincipal; - private final String clientUserName; private final String serverHttpUrl; private final Base64 base64codec; private final HttpContext httpContext; - public HttpKerberosClientAction(String serverPrincipal, String clientUserName, - String serverHttpUrl) { + public HttpKerberosClientAction(String serverPrincipal, String serverHttpUrl) { this.serverPrincipal = serverPrincipal; - this.clientUserName = clientUserName; this.serverHttpUrl = serverHttpUrl; base64codec = new Base64(0); httpContext = new BasicHttpContext(); @@ -102,38 +98,17 @@ public final class HttpAuthUtils { Oid mechOid = new Oid("1.2.840.113554.1.2.2"); // Oid for kerberos principal name Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1"); - GSSManager manager = GSSManager.getInstance(); - - // GSS name for client - GSSName clientName = manager.createName(clientUserName, GSSName.NT_USER_NAME); // GSS name for server GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid); - - // GSS credentials for client - GSSCredential clientCreds = - manager.createCredential(clientName, GSSCredential.DEFAULT_LIFETIME, mechOid, - GSSCredential.INITIATE_ONLY); - - /* - * Create a GSSContext for mutual authentication with the - * server. - * - serverName is the GSSName that represents the server. - * - krb5Oid is the Oid that represents the mechanism to - * use. The client chooses the mechanism to use. - * - clientCreds are the client credentials - */ + // Create a GSSContext for authentication with the service. + // We're passing client credentials as null since we want them to be read from the Subject. GSSContext gssContext = - manager.createContext(serverName, mechOid, clientCreds, GSSContext.DEFAULT_LIFETIME); - - // Mutual authentication not r + manager.createContext(serverName, mechOid, null, GSSContext.DEFAULT_LIFETIME); gssContext.requestMutualAuth(false); - // Establish context byte[] inToken = new byte[0]; - byte[] outToken = gssContext.initSecContext(inToken, 0, inToken.length); - gssContext.dispose(); // Base64 encoded and stringified token for server return new String(base64codec.encode(outToken));
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java Fri Nov 7 20:41:34 2014 @@ -128,6 +128,12 @@ public class OperationLog { void remove() { try { + if (in != null) { + in.close(); + } + if (out != null) { + out.close(); + } FileUtils.forceDelete(file); isRemoved = true; } catch (Exception e) { Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Fri Nov 7 20:41:34 2014 @@ -18,6 +18,8 @@ package org.apache.hive.service.cli.thrift; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -56,6 +58,10 @@ public class ThriftBinaryCLIService exte TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TServerSocket serverSocket = null; + List<String> sslVersionBlacklist = new ArrayList<String>(); + for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) { + sslVersionBlacklist.add(sslVersion); + } if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum); } else { @@ -67,13 +73,16 @@ public class ThriftBinaryCLIService exte String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath, - keyStorePassword); + keyStorePassword, sslVersionBlacklist); } // Server args + int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE); TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket) .processorFactory(processorFactory).transportFactory(transportFactory) - .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService); + .protocolFactory(new TBinaryProtocol.Factory()) + .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize)) + .executorService(executorService); // TCP Server server = new TThreadPoolServer(sargs); Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Fri Nov 7 20:41:34 2014 @@ -19,7 +19,8 @@ package org.apache.hive.service.cli.thrift; import java.io.IOException; -import java.net.InetSocketAddress; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.AbstractService; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.*; @@ -53,7 +55,7 @@ public abstract class ThriftCLIService e protected static HiveAuthFactory hiveAuthFactory; protected int portNum; - protected InetSocketAddress serverAddress; + protected InetAddress serverAddress; protected String hiveHost; protected TServer server; protected org.eclipse.jetty.server.Server httpServer; @@ -75,13 +77,21 @@ public abstract class ThriftCLIService e @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - // Initialize common server configs needed in both binary & http modes String portString; hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); if (hiveHost == null) { hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); } + try { + if (hiveHost != null && !hiveHost.isEmpty()) { + serverAddress = InetAddress.getByName(hiveHost); + } else { + serverAddress = InetAddress.getLocalHost(); + } + } catch (UnknownHostException e) { + throw new ServiceException(e); + } // HTTP mode if (HiveServer2.isHTTPTransportMode(hiveConf)) { workerKeepAliveTime = @@ -105,11 +115,6 @@ public abstract class ThriftCLIService e portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); } } - if (hiveHost != null && !hiveHost.isEmpty()) { - serverAddress = new InetSocketAddress(hiveHost, portNum); - } else { - serverAddress = new InetSocketAddress(portNum); - } minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); super.init(hiveConf); @@ -148,7 +153,7 @@ public abstract class ThriftCLIService e return portNum; } - public InetSocketAddress getServerAddress() { + public InetAddress getServerAddress() { return serverAddress; } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Fri Nov 7 20:41:34 2014 @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.thrift; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -83,6 +84,11 @@ public class ThriftHttpCLIService extend + " Not configured for SSL connection"); } SslContextFactory sslContextFactory = new SslContextFactory(); + String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(","); + LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols)); + sslContextFactory.addExcludeProtocols(excludedProtocols); + LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " + + Arrays.toString(sslContextFactory.getExcludeProtocols())); sslContextFactory.setKeyStorePath(keyStorePath); sslContextFactory.setKeyStorePassword(keyStorePassword); connector = new SslSelectChannelConnector(sslContextFactory); Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Fri Nov 7 20:41:34 2014 @@ -208,11 +208,9 @@ public class ThriftHttpServlet extends T // Create a GSS context gssContext = manager.createContext(serverCreds); - // Get service ticket from the authorization header String serviceTicketBase64 = getAuthHeader(request, authType); byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes()); - gssContext.acceptSecContext(inToken, 0, inToken.length); // Authenticate or deny based on its context completion if (!gssContext.isEstablished()) { Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Fri Nov 7 20:41:34 2014 @@ -32,6 +32,10 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; @@ -53,7 +57,6 @@ import org.apache.zookeeper.WatchedEvent import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; /** @@ -66,7 +69,7 @@ public class HiveServer2 extends Composi private CLIService cliService; private ThriftCLIService thriftCLIService; private String znodePath; - private ZooKeeper zooKeeperClient; + private CuratorFramework zooKeeperClient; private boolean registeredWithZooKeeper = false; public HiveServer2() { @@ -74,7 +77,6 @@ public class HiveServer2 extends Composi HiveConf.setLoadHiveServer2Config(true); } - @Override public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(this); @@ -109,36 +111,60 @@ public class HiveServer2 extends Composi } /** + * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory + */ + private final ACLProvider zooKeeperAclProvider = new ACLProvider() { + List<ACL> nodeAcls = new ArrayList<ACL>(); + + @Override + public List<ACL> getDefaultAcl() { + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { + // Read all to the world + nodeAcls.addAll(Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to the authenticated user + nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS)); + } else { + // ACLs for znodes on a non-kerberized cluster + // Create/Read/Delete/Write/Admin to the world + nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE); + } + return nodeAcls; + } + + @Override + public List<ACL> getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + /** * Adds a server instance to ZooKeeper as a znode. * * @param hiveConf * @throws Exception */ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { - int zooKeeperSessionTimeout = - hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(hiveConf); byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); - // Znode ACLs - List<ACL> nodeAcls = new ArrayList<ACL>(); - setUpAuthAndAcls(hiveConf, nodeAcls); - // Create a ZooKeeper client + setUpZooKeeperAuth(hiveConf); + // Create a CuratorFramework instance to be used as the ZooKeeper client + // Use the zooKeeperAclProvider to create appropriate ACLs zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + zooKeeperClient.start(); // Create the parent znodes recursively; ignore if the parent already exists. - // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs, - // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls} try { - ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls, - CreateMode.PERSISTENT); + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NODEEXISTS) { LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e); - throw (e); + throw e; } } // Create a znode under the rootNamespace parent for this instance of the server @@ -149,56 +175,40 @@ public class HiveServer2 extends Composi + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; znodePath = - zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls, - CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperClient.create().creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8); setRegisteredWithZooKeeper(true); // Set a watch on the znode - if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) { + if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); } LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); } catch (KeeperException e) { LOG.fatal("Unable to create a znode for this server instance", e); - throw new Exception(e); + throw (e); } } /** - * Set up ACLs for znodes based on whether the cluster is secure or not. - * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication. - * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user. - * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world. + * For a kerberized cluster, we dynamically set up the client's JAAS conf. * - * For a kerberized cluster, we also dynamically set up the client's JAAS conf. * @param hiveConf - * @param nodeAcls * @return * @throws Exception */ - private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception { + private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception { if (ShimLoader.getHadoopShims().isSecurityEnabled()) { String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); if (principal.isEmpty()) { - throw new IOException( - "HiveServer2 Kerberos principal is empty"); + throw new IOException("HiveServer2 Kerberos principal is empty"); } String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); if (keyTabFile.isEmpty()) { - throw new IOException( - "HiveServer2 Kerberos keytab is empty"); + throw new IOException("HiveServer2 Kerberos keytab is empty"); } - // Install the JAAS Configuration for the runtime ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile); - // Read all to the world - nodeAcls.addAll(Ids.READ_ACL_UNSAFE); - // Create/Delete/Write/Admin to the authenticated user - nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS)); - } else { - // ACLs for znodes on a non-kerberized cluster - // Create/Read/Delete/Write/Admin to the world - nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE); } } @@ -243,7 +253,7 @@ public class HiveServer2 extends Composi if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) { throw new Exception("Unable to get the server address; it hasn't been initialized yet."); } - return thriftCLIService.getServerAddress().getHostName() + ":" + return thriftCLIService.getServerAddress().getHostAddress() + ":" + thriftCLIService.getPortNumber(); } @@ -344,24 +354,25 @@ public class HiveServer2 extends Composi */ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception { HiveConf hiveConf = new HiveConf(); - int zooKeeperSessionTimeout = - hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); - ZooKeeper zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); - // Get all znode paths + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); List<String> znodePaths = - zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace, - false); + zooKeeperClient.getChildren().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper for (String znodePath : znodePaths) { if (znodePath.contains("version=" + versionNumber + ";")) { - zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1); + LOG.info("Removing the znode: " + znodePath + " from ZooKeeper"); + zooKeeperClient.delete().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath); } } + zooKeeperClient.close(); } public static void main(String[] args) { @@ -516,8 +527,8 @@ public class HiveServer2 extends Composi } /** - * DeregisterOptionExecutor: executes the --deregister option by - * deregistering all HiveServer2 instances from ZooKeeper of a specific version. + * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2 + * instances from ZooKeeper of a specific version. */ static class DeregisterOptionExecutor implements ServerOptionsExecutor { private final String versionNumber; @@ -539,4 +550,3 @@ public class HiveServer2 extends Composi } } } - Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Nov 7 20:41:34 2014 @@ -76,9 +76,6 @@ import org.apache.hadoop.security.authen import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; import org.apache.tez.test.MiniTezCluster; import com.google.common.base.Joiner; @@ -90,7 +87,6 @@ import com.google.common.collect.Iterabl * Implemention of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { - private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; HadoopShims.MiniDFSShim cluster = null; @@ -230,22 +226,13 @@ public class Hadoop23Shims extends Hadoo */ @Override public void refreshDefaultQueue(Configuration conf, String userName) throws IOException { - String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) { - AllocationConfiguration allocConf = new AllocationConfiguration(conf); - QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy(); - if (queuePolicy != null) { - requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); - if (StringUtils.isNotBlank(requestedQueue)) { - LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName); - conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); - } - } + ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, userName); } } private boolean isFairScheduler (Configuration conf) { - return FairScheduler.class.getName(). + return "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler". equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); } Modified: hive/branches/spark/shims/aggregator/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/aggregator/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/aggregator/pom.xml (original) +++ hive/branches/spark/shims/aggregator/pom.xml Fri Nov 7 20:41:34 2014 @@ -63,5 +63,11 @@ <version>${project.version}</version> <scope>runtime</scope> </dependency> + <dependency> + <groupId>org.apache.hive.shims</groupId> + <artifactId>hive-shims-scheduler</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> </dependencies> </project> Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Fri Nov 7 20:41:34 2014 @@ -463,6 +463,16 @@ public abstract class HadoopShimsSecure @Override public UserGroupInformation getUGIForConf(Configuration conf) throws IOException { + String doAs = System.getenv("HADOOP_USER_NAME"); + if(doAs != null && doAs.length() > 0) { + /* + * this allows doAs (proxy user) to be passed along across process boundary where + * delegation tokens are not supported. For example, a DDL stmt via WebHCat with + * a doAs parameter, forks to 'hcat' which needs to start a Session that + * proxies the end user + */ + return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser()); + } return UserGroupInformation.getCurrentUser(); } Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Fri Nov 7 20:41:34 2014 @@ -62,8 +62,7 @@ public class ZooKeeperTokenStore impleme private String rootNode = ""; private volatile CuratorFramework zkSession; private String zkConnectString; - private final int zkSessionTimeout = 3000; - private int connectTimeoutMillis = -1; + private int connectTimeoutMillis; private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); /** @@ -101,10 +100,10 @@ public class ZooKeeperTokenStore impleme if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { synchronized (this) { if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { - zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString) - .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) - .aclProvider(aclDefaultProvider) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession = + CuratorFrameworkFactory.builder().connectString(zkConnectString) + .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); zkSession.start(); } } @@ -431,12 +430,14 @@ public class ZooKeeperTokenStore impleme @Override public void init(Object objectStore, ServerMode smode) { this.serverMode = smode; - zkConnectString = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + zkConnectString = + conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); if (zkConnectString == null || zkConnectString.trim().isEmpty()) { // try alternate config param - zkConnectString = conf.get( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null); + zkConnectString = + conf.get( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + null); if (zkConnectString == null || zkConnectString.trim().isEmpty()) { throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR @@ -445,14 +446,17 @@ public class ZooKeeperTokenStore impleme + WHEN_ZK_DSTORE_MSG); } } - connectTimeoutMillis = conf.getInt( - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1); + connectTimeoutMillis = + conf.getInt( + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null); if (StringUtils.isNotBlank(aclStr)) { this.newNodeAcl = parseACLs(aclStr); } - rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, - HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + rootNode = + conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE, + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; try { // Install the JAAS Configuration for the runtime Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java (original) +++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java Fri Nov 7 20:41:34 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.AppenderSkeleton; @@ -33,6 +34,7 @@ public abstract class ShimLoader { private static JettyShims jettyShims; private static AppenderSkeleton eventCounter; private static HadoopThriftAuthBridge hadoopThriftAuthBridge; + private static SchedulerShim schedulerShim; /** * The names of the classes for shimming Hadoop for each major version. @@ -87,6 +89,9 @@ public abstract class ShimLoader { } + private static final String SCHEDULER_SHIM_CLASSE = + "org.apache.hadoop.hive.schshim.FairSchedulerShim"; + /** * Factory method to get an instance of HadoopShims based on the * version of Hadoop on the classpath. @@ -124,6 +129,13 @@ public abstract class ShimLoader { return hadoopThriftAuthBridge; } + public static synchronized SchedulerShim getSchedulerShims() { + if (schedulerShim == null) { + schedulerShim = createShim(SCHEDULER_SHIM_CLASSE, SchedulerShim.class); + } + return schedulerShim; + } + private static <T> T loadShims(Map<String, String> classMap, Class<T> xface) { String vers = getMajorVersion(); String className = classMap.get(vers); Modified: hive/branches/spark/shims/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/pom.xml (original) +++ hive/branches/spark/shims/pom.xml Fri Nov 7 20:41:34 2014 @@ -37,6 +37,7 @@ <module>common-secure</module> <module>0.20S</module> <module>0.23</module> + <module>scheduler</module> <module>aggregator</module> </modules> </project> Modified: hive/branches/spark/shims/scheduler/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/scheduler/pom.xml?rev=1637444&r1=1636884&r2=1637444&view=diff ============================================================================== --- hive/branches/spark/shims/scheduler/pom.xml (original) +++ hive/branches/spark/shims/scheduler/pom.xml Fri Nov 7 20:41:34 2014 @@ -80,6 +80,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> <version>${hadoop-23.version}</version> + <optional>true</optional> </dependency> <dependency> <groupId>org.apache.hadoop</groupId>