Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java (original) +++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java Thu Oct 30 16:22:33 2014 @@ -85,77 +85,59 @@ public class TestAvroSerde { } @Test - public void noSchemaProvidedReturnsErrorSchema() throws SerDeException { + public void noSchemaProvidedThrowsException() { Properties props = new Properties(); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } @Test - public void gibberishSchemaProvidedReturnsErrorSchema() throws SerDeException { + public void gibberishSchemaProvidedReturnsErrorSchema() { Properties props = new Properties(); props.put(AvroSerdeUtils.SCHEMA_LITERAL, "blahblahblah"); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } @Test - public void emptySchemaProvidedReturnsErrorSchema() throws SerDeException { + public void emptySchemaProvidedThrowsException() { Properties props = new Properties(); props.put(AvroSerdeUtils.SCHEMA_LITERAL, ""); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } @Test - public void badSchemaURLProvidedReturnsErrorSchema() throws SerDeException { + public void badSchemaURLProvidedThrowsException() { Properties props = new Properties(); props.put(AvroSerdeUtils.SCHEMA_URL, "not://a/url"); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } @Test - public void emptySchemaURLProvidedReturnsErrorSchema() throws SerDeException { + public void emptySchemaURLProvidedThrowsException() { Properties props = new Properties(); props.put(AvroSerdeUtils.SCHEMA_URL, ""); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } @Test - public void bothPropertiesSetToNoneReturnsErrorSchema() throws SerDeException { + public void bothPropertiesSetToNoneThrowsException() { Properties props = new Properties(); props.put(AvroSerdeUtils.SCHEMA_URL, AvroSerdeUtils.SCHEMA_NONE); props.put(AvroSerdeUtils.SCHEMA_LITERAL, AvroSerdeUtils.SCHEMA_NONE); - verifyErrorSchemaReturned(props); + verifyExpectedException(props); } - private void verifyErrorSchemaReturned(Properties props) throws SerDeException { + private void verifyExpectedException(Properties props) { AvroSerDe asd = new AvroSerDe(); - SerDeUtils.initializeSerDe(asd, new Configuration(), props, null); - assertTrue(asd.getObjectInspector() instanceof StandardStructObjectInspector); - StandardStructObjectInspector oi = (StandardStructObjectInspector)asd.getObjectInspector(); - List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs(); - assertEquals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA.getFields().size(), allStructFieldRefs.size()); - StructField firstField = allStructFieldRefs.get(0); - assertTrue(firstField.toString().contains("error_error_error_error_error_error_error")); - - try { - Writable mock = Mockito.mock(Writable.class); - asd.deserialize(mock); - fail("Should have thrown a BadSchemaException"); - } catch (BadSchemaException bse) { - // good - } - try { - Object o = Mockito.mock(Object.class); - ObjectInspector mockOI = Mockito.mock(ObjectInspector.class); - asd.serialize(o, mockOI); - fail("Should have thrown a BadSchemaException"); - } catch (BadSchemaException bse) { + SerDeUtils.initializeSerDe(asd, new Configuration(), props, null); + fail("Expected Exception did not be thrown"); + } catch (SerDeException e) { // good } }
Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java (original) +++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java Thu Oct 30 16:22:33 2014 @@ -10,6 +10,12 @@ import java.sql.Date; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class TestDateWritable { @@ -135,4 +141,61 @@ public class TestDateWritable { private static String getRandomDateString() { return dateStrings[(int) (Math.random() * 365)]; } + + public static class DateTestCallable implements Callable<String> { + public DateTestCallable() { + } + + @Override + public String call() throws Exception { + // Iterate through each day of the year, make sure Date/DateWritable match + Date originalDate = Date.valueOf("2014-01-01"); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(originalDate.getTime()); + for (int idx = 0; idx < 365; ++idx) { + originalDate = new Date(cal.getTimeInMillis()); + // Make sure originalDate is at midnight in the local time zone, + // since DateWritable will generate dates at that time. + originalDate = Date.valueOf(originalDate.toString()); + DateWritable dateWritable = new DateWritable(originalDate); + if (!originalDate.equals(dateWritable.get())) { + return originalDate.toString(); + } + cal.add(Calendar.DAY_OF_YEAR, 1); + } + // Success! + return null; + } + } + + @Test + public void testDaylightSavingsTime() throws InterruptedException, ExecutionException { + String[] timeZones = { + "GMT", + "UTC", + "America/Godthab", + "America/Los_Angeles", + "Asia/Jerusalem", + "Australia/Melbourne", + "Europe/London", + // time zones with half hour boundaries + "America/St_Johns", + "Asia/Tehran", + }; + + for (String timeZone: timeZones) { + TimeZone previousDefault = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)); + assertEquals("Default timezone should now be " + timeZone, + timeZone, TimeZone.getDefault().getID()); + ExecutorService threadPool = Executors.newFixedThreadPool(1); + try { + Future<String> future = threadPool.submit(new DateTestCallable()); + String result = future.get(); + assertNull("Failed at timezone " + timeZone + ", date " + result, result); + } finally { + threadPool.shutdown(); TimeZone.setDefault(previousDefault); + } + } + } } Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java (original) +++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java Thu Oct 30 16:22:33 2014 @@ -194,6 +194,113 @@ public class TestLazyArrayMapStruct exte } } + /* + * test LazyMap with bad entries, e.g., empty key or empty entries + * where '[' and ']' don't exist, only for notation purpose, + * STX with value of 2 as entry separator, ETX with 3 as key/value separator + * */ + public void testLazyMapWithBadEntries() throws Throwable { + try { + { + // Map of String to String + Text nullSequence = new Text(""); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get( + 0), new byte[] {'\2', '\3'}, 0, nullSequence, + false, (byte) 0); + LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi); + + //read friendly string: ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv + byte[] data = new byte[] { + 'a', 'k', '\3', 'a', 'v', + '\02', 'b', 'k', '\3', 'b', 'v', + '\02', 'c', 'k', '\3', 'c', 'v', + '\02', 'd', 'k', '\3', 'd', 'v'}; + TestLazyPrimitive.initLazyObject(b, data, 0, data.length); + + assertEquals(new Text("av"), ((LazyString) b + .getMapValueElement(new Text("ak"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-1"))); + assertEquals(new Text("bv"), ((LazyString) b + .getMapValueElement(new Text("bk"))).getWritableObject()); + assertEquals(new Text("cv"), ((LazyString) b + .getMapValueElement(new Text("ck"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-"))); + assertEquals(new Text("dv"), ((LazyString) b + .getMapValueElement(new Text("dk"))).getWritableObject()); + assertEquals(4, b.getMapSize()); + } + + { + // Map of String to String, LazyMap allows empty-string style key, e.g., {"" : null} + // or {"", ""}, but not null style key, e.g., {null:""} + Text nullSequence = new Text(""); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get( + 0), new byte[] {'\2', '\3'}, 0, nullSequence, + false, (byte) 0); + LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi); + + //read friendly string: [STX]ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv + byte[] data = new byte[] { + '\02', 'a', 'k', '\3', 'a', 'v', + '\02', 'b', 'k', '\3', 'b', 'v', + '\02', 'c', 'k', '\3', 'c', 'v', + '\02', 'd', 'k', '\3', 'd', 'v'}; + TestLazyPrimitive.initLazyObject(b, data, 0, data.length); + + assertNull(b.getMapValueElement(new Text(""))); //{"" : null} + assertEquals(new Text("av"), ((LazyString) b + .getMapValueElement(new Text("ak"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-1"))); + assertEquals(new Text("bv"), ((LazyString) b + .getMapValueElement(new Text("bk"))).getWritableObject()); + assertEquals(new Text("cv"), ((LazyString) b + .getMapValueElement(new Text("ck"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-"))); + assertEquals(new Text("dv"), ((LazyString) b + .getMapValueElement(new Text("dk"))).getWritableObject()); + assertEquals(4, b.getMapSize()); + } + + { + // Map of String to String, LazyMap allows empty-string style key, e.g., {"" : null} + // or {"", ""}, but not null style key, e.g., {null:""} + Text nullSequence = new Text(""); + ObjectInspector oi = LazyFactory.createLazyObjectInspector( + TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get( + 0), new byte[] {'\2', '\3'}, 0, nullSequence, + false, (byte) 0); + LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi); + + //read friendly string: [ETX][STX]ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv + byte[] data = new byte[] { + '\03', + '\02', 'a', 'k', '\3', 'a', 'v', + '\02', 'b', 'k', '\3', 'b', 'v', + '\02', 'c', 'k', '\3', 'c', 'v', + '\02', 'd', 'k', '\3', 'd', 'v'}; + TestLazyPrimitive.initLazyObject(b, data, 0, data.length); + + assertNull(b.getMapValueElement(new Text(""))); //{"" : null} + assertEquals(new Text("av"), ((LazyString) b + .getMapValueElement(new Text("ak"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-1"))); + assertEquals(new Text("bv"), ((LazyString) b + .getMapValueElement(new Text("bk"))).getWritableObject()); + assertEquals(new Text("cv"), ((LazyString) b + .getMapValueElement(new Text("ck"))).getWritableObject()); + assertNull(b.getMapValueElement(new Text("-"))); + assertEquals(new Text("dv"), ((LazyString) b + .getMapValueElement(new Text("dk"))).getWritableObject()); + assertEquals(4, b.getMapSize()); + } + } catch(Throwable e) { + e.printStackTrace(); + throw e; + } + } + /** * Test the LazyMap class. */ Modified: hive/branches/spark/service/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/service/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/pom.xml (original) +++ hive/branches/spark/service/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java Thu Oct 30 16:22:33 2014 @@ -29,10 +29,11 @@ import javax.security.sasl.Sasl; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.thrift.TProcessorFactory; @@ -98,7 +99,7 @@ public class HiveAuthFactory { conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); // start delegation token manager try { - saslServer.startDelegationTokenSecretManager(conf, null); + saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2); } catch (IOException e) { throw new TTransportException("Failed to start token manager", e); } @@ -234,65 +235,72 @@ public class HiveAuthFactory { // retrieve delegation token for the given user public String getDelegationToken(String owner, String renewer) throws HiveSQLException { if (saslServer == null) { - throw new HiveSQLException("Delegation token only supported over kerberos authentication"); + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01"); } try { String tokenStr = saslServer.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN); if (tokenStr == null || tokenStr.isEmpty()) { - throw new HiveSQLException("Received empty retrieving delegation token for user " + owner); + throw new HiveSQLException( + "Received empty retrieving delegation token for user " + owner, "08S01"); } return tokenStr; } catch (IOException e) { - throw new HiveSQLException("Error retrieving delegation token for user " + owner, e); + throw new HiveSQLException( + "Error retrieving delegation token for user " + owner, "08S01", e); } catch (InterruptedException e) { - throw new HiveSQLException("delegation token retrieval interrupted", e); + throw new HiveSQLException("delegation token retrieval interrupted", "08S01", e); } } // cancel given delegation token public void cancelDelegationToken(String delegationToken) throws HiveSQLException { if (saslServer == null) { - throw new HiveSQLException("Delegation token only supported over kerberos authentication"); + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01"); } try { saslServer.cancelDelegationToken(delegationToken); } catch (IOException e) { - throw new HiveSQLException("Error canceling delegation token " + delegationToken, e); + throw new HiveSQLException( + "Error canceling delegation token " + delegationToken, "08S01", e); } } public void renewDelegationToken(String delegationToken) throws HiveSQLException { if (saslServer == null) { - throw new HiveSQLException("Delegation token only supported over kerberos authentication"); + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01"); } try { saslServer.renewDelegationToken(delegationToken); } catch (IOException e) { - throw new HiveSQLException("Error renewing delegation token " + delegationToken, e); + throw new HiveSQLException( + "Error renewing delegation token " + delegationToken, "08S01", e); } } public String getUserFromToken(String delegationToken) throws HiveSQLException { if (saslServer == null) { - throw new HiveSQLException("Delegation token only supported over kerberos authentication"); + throw new HiveSQLException( + "Delegation token only supported over kerberos authentication", "08S01"); } try { return saslServer.getUserFromToken(delegationToken); } catch (IOException e) { - throw new HiveSQLException("Error extracting user from delegation token " + delegationToken, - e); + throw new HiveSQLException( + "Error extracting user from delegation token " + delegationToken, "08S01", e); } } public static void verifyProxyAccess(String realUser, String proxyUser, String ipAddress, HiveConf hiveConf) throws HiveSQLException { - try { UserGroupInformation sessionUgi; if (ShimLoader.getHadoopShims().isSecurityEnabled()) { - KerberosName kerbName = new KerberosName(realUser); - String shortPrincipalName = kerbName.getServiceName(); + KerberosNameShim kerbName = ShimLoader.getHadoopShims().getKerberosNameShim(realUser); + String shortPrincipalName = kerbName.getServiceName(); sessionUgi = ShimLoader.getHadoopShims().createProxyUser(shortPrincipalName); } else { sessionUgi = ShimLoader.getHadoopShims().createRemoteUser(realUser, null); @@ -303,8 +311,8 @@ public class HiveAuthFactory { } } catch (IOException e) { throw new HiveSQLException( - "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, e); + "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, "08S01", e); } } - + } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java Thu Oct 30 16:22:33 2014 @@ -44,6 +44,7 @@ import org.apache.hive.service.auth.Hive import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.HiveServer2; /** * CLIService. @@ -64,15 +65,18 @@ public class CLIService extends Composit private SessionManager sessionManager; private UserGroupInformation serviceUGI; private UserGroupInformation httpUGI; + // The HiveServer2 instance running this service + private final HiveServer2 hiveServer2; - public CLIService() { + public CLIService(HiveServer2 hiveServer2) { super(CLIService.class.getSimpleName()); + this.hiveServer2 = hiveServer2; } @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - sessionManager = new SessionManager(); + sessionManager = new SessionManager(hiveServer2); addService(sessionManager); // If the hadoop cluster is secure, do a kerberos login for the service from the keytab if (ShimLoader.getHadoopShims().isSecurityEnabled()) { @@ -201,7 +205,8 @@ public class CLIService extends Composit * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle) */ @Override - public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { + public void closeSession(SessionHandle sessionHandle) + throws HiveSQLException { sessionManager.closeSession(sessionHandle); LOG.debug(sessionHandle + ": closeSession()"); } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Thu Oct 30 16:22:33 2014 @@ -35,14 +35,16 @@ import org.apache.hadoop.hive.common.cli import org.apache.hadoop.hive.common.cli.IHiveFileProcessor; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.exec.FetchFormatter; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.*; @@ -80,7 +82,6 @@ public class HiveSessionImpl implements private SessionManager sessionManager; private OperationManager operationManager; - private IMetaStoreClient metastoreClient = null; private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>(); private boolean isOperationLogEnabled; private File sessionLogDir; @@ -95,6 +96,17 @@ public class HiveSessionImpl implements this.hiveConf = new HiveConf(serverhiveConf); this.ipAddress = ipAddress; + try { + // In non-impersonation mode, map scheduler queue to current user + // if fair scheduler is configured. + if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && + hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { + ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username); + } + } catch (IOException e) { + LOG.warn("Error setting scheduler queue: " + e, e); + } + // Set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); @@ -315,14 +327,13 @@ public class HiveSessionImpl implements @Override public IMetaStoreClient getMetaStoreClient() throws HiveSQLException { - if (metastoreClient == null) { - try { - metastoreClient = new HiveMetaStoreClient(getHiveConf()); - } catch (MetaException e) { - throw new HiveSQLException(e); - } + try { + return Hive.get(getHiveConf()).getMSC(); + } catch (HiveException e) { + throw new HiveSQLException("Failed to get metastore connection", e); + } catch (MetaException e) { + throw new HiveSQLException("Failed to get metastore connection", e); } - return metastoreClient; } @Override @@ -538,14 +549,6 @@ public class HiveSessionImpl implements public void close() throws HiveSQLException { try { acquire(true); - /** - * For metadata operations like getTables(), getColumns() etc, - * the session allocates a private metastore handler which should be - * closed at the end of the session - */ - if (metastoreClient != null) { - metastoreClient.close(); - } // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { operationManager.closeOperation(opHandle); Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Oct 30 16:22:33 2014 @@ -43,6 +43,7 @@ import org.apache.hive.service.cli.HiveS import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; +import org.apache.hive.service.server.HiveServer2; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; /** @@ -65,9 +66,12 @@ public class SessionManager extends Comp private long sessionTimeout; private volatile boolean shutdown; + // The HiveServer2 instance running this service + private final HiveServer2 hiveServer2; - public SessionManager() { + public SessionManager(HiveServer2 hiveServer2) { super(SessionManager.class.getSimpleName()); + this.hiveServer2 = hiveServer2; } @Override @@ -232,10 +236,10 @@ public class SessionManager extends Comp /** * Opens a new session and creates a session handle. * The username passed to this method is the effective username. - * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession + * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession * within a UGI.doAs, where UGI corresponds to the effective user. - * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName() - * + * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName() + * * @param protocol * @param username * @param password @@ -288,6 +292,24 @@ public class SessionManager extends Comp throw new HiveSQLException("Session does not exist!"); } session.close(); + // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions + if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) + && (!hiveServer2.isRegisteredWithZooKeeper())) { + // Asynchronously shutdown this instance of HiveServer2, + // if there are no active client sessions + if (getOpenSessionCount() == 0) { + LOG.info("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + Thread shutdownThread = new Thread() { + @Override + public void run() { + hiveServer2.stop(); + } + }; + shutdownThread.start(); + } + } } public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException { @@ -376,6 +398,5 @@ public class SessionManager extends Comp public int getOpenSessionCount() { return handleToSession.size(); } - } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java Thu Oct 30 16:22:33 2014 @@ -30,7 +30,7 @@ import org.apache.hive.service.cli.ICLIS public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public EmbeddedThriftBinaryCLIService() { - super(new CLIService()); + super(new CLIService(null)); isEmbedded = true; HiveConf.setLoadHiveServer2Config(true); cliService.init(new HiveConf()); Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Oct 30 16:22:33 2014 @@ -66,7 +66,6 @@ public abstract class ThriftCLIService e protected int minWorkerThreads; protected int maxWorkerThreads; protected long workerKeepAliveTime; - private HiveServer2 hiveServer2; public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); @@ -264,9 +263,9 @@ public abstract class ThriftCLIService e /** * Returns the effective username. - * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user + * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user * 2. If hive.server2.allow.user.substitution = true: the username of the end user, - * that the connecting user is trying to proxy for. + * that the connecting user is trying to proxy for. * This includes a check whether the connecting user is allowed to proxy for the end user. * @param req * @return @@ -366,24 +365,6 @@ public abstract class ThriftCLIService e } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); - } finally { - if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) - && (!hiveServer2.isRegisteredWithZooKeeper())) { - // Asynchronously shutdown this instance of HiveServer2, - // if there are no active client sessions - if (cliService.getSessionManager().getOpenSessionCount() == 0) { - LOG.info("This instance of HiveServer2 has been removed from the list of server " - + "instances available for dynamic service discovery. " - + "The last client session has ended - will shutdown now."); - Thread shutdownThread = new Thread() { - @Override - public void run() { - hiveServer2.stop(); - } - }; - shutdownThread.start(); - } - } } return resp; } @@ -666,10 +647,4 @@ public abstract class ThriftCLIService e return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION) .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString()); } - - public void setHiveServer2(HiveServer2 hiveServer2) { - this.hiveServer2 = hiveServer2; - } - } - Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Oct 30 16:22:33 2014 @@ -29,12 +29,10 @@ import org.apache.hadoop.hive.shims.Shim import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.TCLIService.Iface; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.apache.thrift.TProcessor; -import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServlet; @@ -60,9 +58,6 @@ public class ThriftHttpCLIService extend @Override public void run() { try { - // Verify config validity - verifyHttpConfiguration(hiveConf); - // HTTP Server httpServer = new org.eclipse.jetty.server.Server(); @@ -162,32 +157,4 @@ public class ThriftHttpCLIService extend } return httpPath; } - - /** - * Verify that this configuration is supported by transportMode of HTTP - * @param hiveConf - */ - private static void verifyHttpConfiguration(HiveConf hiveConf) { - String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); - - // Error out if KERBEROS auth mode is being used and use SSL is also set to true - if(authType.equalsIgnoreCase(AuthTypes.KERBEROS.toString()) && - hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { - String msg = ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting of " + - authType + " is not supported with " + - ConfVars.HIVE_SERVER2_USE_SSL + " set to true"; - LOG.fatal(msg); - throw new RuntimeException(msg); - } - - // Warn that SASL is not used in http mode - if(authType.equalsIgnoreCase(AuthTypes.NONE.toString())) { - // NONE in case of thrift mode uses SASL - LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " + - authType + ". SASL is not supported with http transport mode," + - " so using equivalent of " - + AuthTypes.NOSASL); - } - } - } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Thu Oct 30 16:22:33 2014 @@ -31,8 +31,9 @@ import org.apache.commons.codec.binary.B import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hive.service.auth.AuthenticationProviderFactory; import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods; import org.apache.hive.service.auth.HiveAuthFactory; @@ -237,19 +238,31 @@ public class ThriftHttpServlet extends T } } - private String getPrincipalWithoutRealm(String fullPrincipal) { - KerberosName fullKerberosName = new KerberosName(fullPrincipal); + private String getPrincipalWithoutRealm(String fullPrincipal) + throws HttpAuthenticationException { + KerberosNameShim fullKerberosName; + try { + fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal); + } catch (IOException e) { + throw new HttpAuthenticationException(e); + } String serviceName = fullKerberosName.getServiceName(); - String hostName = fullKerberosName.getHostName(); + String hostName = fullKerberosName.getHostName(); String principalWithoutRealm = serviceName; if (hostName != null) { principalWithoutRealm = serviceName + "/" + hostName; } return principalWithoutRealm; } - - private String getPrincipalWithoutRealmAndHost(String fullPrincipal) { - KerberosName fullKerberosName = new KerberosName(fullPrincipal); + + private String getPrincipalWithoutRealmAndHost(String fullPrincipal) + throws HttpAuthenticationException { + KerberosNameShim fullKerberosName; + try { + fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal); + } catch (IOException e) { + throw new HttpAuthenticationException(e); + } return fullKerberosName.getServiceName(); } } Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original) +++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Thu Oct 30 16:22:33 2014 @@ -18,8 +18,18 @@ package org.apache.hive.service.server; +import java.io.IOException; import java.nio.charset.Charset; - +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.LogUtils; @@ -29,10 +39,10 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.CompositeService; -import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; @@ -42,7 +52,9 @@ import org.apache.zookeeper.KeeperExcept import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; /** * HiveServer2. @@ -65,7 +77,7 @@ public class HiveServer2 extends Composi @Override public synchronized void init(HiveConf hiveConf) { - cliService = new CLIService(); + cliService = new CLIService(this); addService(cliService); if (isHTTPTransportMode(hiveConf)) { thriftCLIService = new ThriftHttpCLIService(cliService); @@ -73,7 +85,6 @@ public class HiveServer2 extends Composi thriftCLIService = new ThriftBinaryCLIService(cliService); } addService(thriftCLIService); - thriftCLIService.setHiveServer2(this); super.init(hiveConf); // Add a shutdown hook for catching SIGTERM & SIGINT @@ -110,14 +121,19 @@ public class HiveServer2 extends Composi String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(hiveConf); byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); + // Znode ACLs + List<ACL> nodeAcls = new ArrayList<ACL>(); + setUpAuthAndAcls(hiveConf, nodeAcls); + // Create a ZooKeeper client zooKeeperClient = new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, new ZooKeeperHiveHelper.DummyWatcher()); - - // Create the parent znodes recursively; ignore if the parent already exists + // Create the parent znodes recursively; ignore if the parent already exists. + // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs, + // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls} try { - ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls, + CreateMode.PERSISTENT); LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NODEEXISTS) { @@ -126,14 +142,14 @@ public class HiveServer2 extends Composi } } // Create a znode under the rootNamespace parent for this instance of the server - // Znode name: server-host:port-versionInfo-sequence + // Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber try { - String znodePath = + String pathPrefix = ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-" - + HiveVersionInfo.getVersion() + "-"; + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; znodePath = - zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE, + zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls, CreateMode.EPHEMERAL_SEQUENTIAL); setRegisteredWithZooKeeper(true); // Set a watch on the znode @@ -149,11 +165,50 @@ public class HiveServer2 extends Composi } /** + * Set up ACLs for znodes based on whether the cluster is secure or not. + * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication. + * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user. + * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world. + * + * For a kerberized cluster, we also dynamically set up the client's JAAS conf. + * @param hiveConf + * @param nodeAcls + * @return + * @throws Exception + */ + private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception { + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { + String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + if (principal.isEmpty()) { + throw new IOException( + "HiveServer2 Kerberos principal is empty"); + } + String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + if (keyTabFile.isEmpty()) { + throw new IOException( + "HiveServer2 Kerberos keytab is empty"); + } + + // Install the JAAS Configuration for the runtime + ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile); + // Read all to the world + nodeAcls.addAll(Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to the authenticated user + nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS)); + } else { + // ACLs for znodes on a non-kerberized cluster + // Create/Read/Delete/Write/Admin to the world + nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE); + } + } + + /** * The watcher class which sets the de-register flag when the znode corresponding to this server * instance is deleted. Additionally, it shuts down the server if there are no more active client * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper. */ private class DeRegisterWatcher implements Watcher { + @Override public void process(WatchedEvent event) { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { HiveServer2.this.setRegisteredWithZooKeeper(false); @@ -233,6 +288,7 @@ public class HiveServer2 extends Composi private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; while (true) { + LOG.info("Starting HiveServer2"); HiveConf hiveConf = new HiveConf(); maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; @@ -280,31 +336,206 @@ public class HiveServer2 extends Composi } } + /** + * Remove all znodes corresponding to the given version number from ZooKeeper + * + * @param versionNumber + * @throws Exception + */ + static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception { + HiveConf hiveConf = new HiveConf(); + int zooKeeperSessionTimeout = + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); + String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + ZooKeeper zooKeeperClient = + new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, + new ZooKeeperHiveHelper.DummyWatcher()); + // Get all znode paths + List<String> znodePaths = + zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace, + false); + // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper + for (String znodePath : znodePaths) { + if (znodePath.contains("version=" + versionNumber + ";")) { + zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1); + } + } + } + public static void main(String[] args) { HiveConf.setLoadHiveServer2Config(true); try { ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2"); - if (!oproc.process(args)) { - System.err.println("Error starting HiveServer2 with given arguments"); - System.exit(-1); - } + ServerOptionsProcessorResponse oprocResponse = oproc.parse(args); // NOTE: It is critical to do this here so that log4j is reinitialized // before any of the other core hive classes are loaded String initLog4jMessage = LogUtils.initHiveLog4j(); LOG.debug(initLog4jMessage); - HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG); - // log debug message from "oproc" after log4j initialize properly + + // Log debug message from "oproc" after log4j initialize properly LOG.debug(oproc.getDebugMessage().toString()); - startHiveServer2(); + // Call the executor which will execute the appropriate command based on the parsed options + oprocResponse.getServerOptionsExecutor().execute(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); System.exit(-1); - } catch (Throwable t) { - LOG.fatal("Error starting HiveServer2", t); - System.exit(-1); + } + } + + /** + * ServerOptionsProcessor. + * Process arguments given to HiveServer2 (-hiveconf property=value) + * Set properties in System properties + * Create an appropriate response object, + * which has executor to execute the appropriate command based on the parsed options. + */ + static class ServerOptionsProcessor { + private final Options options = new Options(); + private org.apache.commons.cli.CommandLine commandLine; + private final String serverName; + private final StringBuilder debugMessage = new StringBuilder(); + + @SuppressWarnings("static-access") + ServerOptionsProcessor(String serverName) { + this.serverName = serverName; + // -hiveconf x=y + options.addOption(OptionBuilder + .withValueSeparator() + .hasArgs(2) + .withArgName("property=value") + .withLongOpt("hiveconf") + .withDescription("Use value for given property") + .create()); + // -deregister <versionNumber> + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("versionNumber") + .withLongOpt("deregister") + .withDescription("Deregister all instances of given version from dynamic service discovery") + .create()); + options.addOption(new Option("H", "help", false, "Print help information")); + } + + ServerOptionsProcessorResponse parse(String[] argv) { + try { + commandLine = new GnuParser().parse(options, argv); + // Process --hiveconf + // Get hiveconf param values and set the System property values + Properties confProps = commandLine.getOptionProperties("hiveconf"); + for (String propKey : confProps.stringPropertyNames()) { + // save logging message for log4j output latter after log4j initialize properly + debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n"); + System.setProperty(propKey, confProps.getProperty(propKey)); + } + + // Process --help + if (commandLine.hasOption('H')) { + return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options)); + } + + // Process --deregister + if (commandLine.hasOption("deregister")) { + return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor( + commandLine.getOptionValue("deregister"))); + } + } catch (ParseException e) { + // Error out & exit - we were not able to parse the args successfully + System.err.println("Error starting HiveServer2 with given arguments: "); + System.err.println(e.getMessage()); + System.exit(-1); + } + // Default executor, when no option is specified + return new ServerOptionsProcessorResponse(new StartOptionExecutor()); + } + + StringBuilder getDebugMessage() { + return debugMessage; + } + } + + /** + * The response sent back from {@link ServerOptionsProcessor#parse(String[])} + */ + static class ServerOptionsProcessorResponse { + private final ServerOptionsExecutor serverOptionsExecutor; + + ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) { + this.serverOptionsExecutor = serverOptionsExecutor; + } + + ServerOptionsExecutor getServerOptionsExecutor() { + return serverOptionsExecutor; + } + } + + /** + * The executor interface for running the appropriate HiveServer2 command based on parsed options + */ + static interface ServerOptionsExecutor { + public void execute(); + } + + /** + * HelpOptionExecutor: executes the --help option by printing out the usage + */ + static class HelpOptionExecutor implements ServerOptionsExecutor { + private final Options options; + private final String serverName; + + HelpOptionExecutor(String serverName, Options options) { + this.options = options; + this.serverName = serverName; + } + + @Override + public void execute() { + new HelpFormatter().printHelp(serverName, options); + System.exit(0); + } + } + + /** + * StartOptionExecutor: starts HiveServer2. + * This is the default executor, when no option is specified. + */ + static class StartOptionExecutor implements ServerOptionsExecutor { + @Override + public void execute() { + try { + startHiveServer2(); + } catch (Throwable t) { + LOG.fatal("Error starting HiveServer2", t); + System.exit(-1); + } + } + } + + /** + * DeregisterOptionExecutor: executes the --deregister option by + * deregistering all HiveServer2 instances from ZooKeeper of a specific version. + */ + static class DeregisterOptionExecutor implements ServerOptionsExecutor { + private final String versionNumber; + + DeregisterOptionExecutor(String versionNumber) { + this.versionNumber = versionNumber; + } + + @Override + public void execute() { + try { + deleteServerInstancesFromZooKeeper(versionNumber); + } catch (Exception e) { + LOG.fatal("Error deregistering HiveServer2 instances for version: " + versionNumber + + " from ZooKeeper", e); + System.exit(-1); + } + System.exit(0); } } } Modified: hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java (original) +++ hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java Thu Oct 30 16:22:33 2014 @@ -39,7 +39,7 @@ public class TestPlainSaslHelper extends hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)); - CLIService cliService = new CLIService(); + CLIService cliService = new CLIService(null); cliService.init(hconf); ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService); tcliService.init(hconf); Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original) +++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Thu Oct 30 16:22:33 2014 @@ -52,7 +52,7 @@ public class TestSessionGlobalInitFile e */ private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) { - super(new CLIService()); + super(new CLIService(null)); isEmbedded = true; cliService.init(hiveConf); cliService.start(); Modified: hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java (original) +++ hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java Thu Oct 30 16:22:33 2014 @@ -21,6 +21,8 @@ package org.apache.hive.service.server; import org.junit.Assert; import org.junit.Test; +import org.apache.hive.service.server.HiveServer2.ServerOptionsProcessor; + /** * Test ServerOptionsProcessor * @@ -39,17 +41,12 @@ public class TestServerOptionsProcessor null, System.getProperty(key)); + optProcessor.parse(args); - boolean isSuccess = optProcessor.process(args); - Assert.assertTrue("options processor result", isSuccess); Assert.assertEquals( "checking system property after processing options", value, System.getProperty(key)); - - - - } } Modified: hive/branches/spark/shims/0.20/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.20/pom.xml (original) +++ hive/branches/spark/shims/0.20/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original) +++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Oct 30 16:22:33 2014 @@ -619,6 +619,12 @@ public class Hadoop20Shims implements Ha } @Override + public String getResolvedPrincipal(String principal) throws IOException { + // Not supported + return null; + } + + @Override public void reLoginUserFromKeytab() throws IOException{ throwKerberosUnsupportedError(); } @@ -698,7 +704,7 @@ public class Hadoop20Shims implements Ha } public class Hadoop20FileStatus implements HdfsFileStatus { - private FileStatus fileStatus; + private final FileStatus fileStatus; public Hadoop20FileStatus(FileStatus fileStatus) { this.fileStatus = fileStatus; } @@ -706,6 +712,7 @@ public class Hadoop20Shims implements Ha public FileStatus getFileStatus() { return fileStatus; } + @Override public void debugLog() { if (fileStatus != null) { LOG.debug(fileStatus.toString()); @@ -824,6 +831,11 @@ public class Hadoop20Shims implements Ha } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public String getTokenFileLocEnvName() { throw new UnsupportedOperationException( "Kerberos not supported in current hadoop version"); @@ -928,4 +940,15 @@ public class Hadoop20Shims implements Ha public Path getCurrentTrashPath(Configuration conf, FileSystem fs) { return null; } + + @Override + public KerberosNameShim getKerberosNameShim(String name) throws IOException { + // Not supported + return null; + } + + @Override + public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) { + // Not supported + } } Modified: hive/branches/spark/shims/0.20S/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.20S/pom.xml (original) +++ hive/branches/spark/shims/0.20S/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Oct 30 16:22:33 2014 @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.security.KerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.VersionInfo; @@ -158,6 +159,11 @@ public class Hadoop20SShims extends Hado } @Override + public void refreshDefaultQueue(Configuration conf, String userName) { + // MR1 does not expose API required to set MR queue mapping for user + } + + @Override public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){ TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile); } @@ -546,4 +552,44 @@ public class Hadoop20SShims extends Hado public Path getCurrentTrashPath(Configuration conf, FileSystem fs) { return null; } + + /** + * Returns a shim to wrap KerberosName + */ + @Override + public KerberosNameShim getKerberosNameShim(String name) throws IOException { + return new KerberosNameShim(name); + } + + /** + * Shim for KerberosName + */ + public class KerberosNameShim implements HadoopShimsSecure.KerberosNameShim { + + private KerberosName kerberosName; + + public KerberosNameShim(String name) { + kerberosName = new KerberosName(name); + } + + public String getDefaultRealm() { + return kerberosName.getDefaultRealm(); + } + + public String getServiceName() { + return kerberosName.getServiceName(); + } + + public String getHostName() { + return kerberosName.getHostName(); + } + + public String getRealm() { + return kerberosName.getRealm(); + } + + public String getShortName() throws IOException { + return kerberosName.getShortName(); + } + } } Modified: hive/branches/spark/shims/0.23/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.23/pom.xml (original) +++ hive/branches/spark/shims/0.23/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> @@ -120,6 +120,11 @@ <optional>true</optional> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <version>${hadoop-23.version}</version> + </dependency> + <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-tests</artifactId> <version>${tez.version}</version> Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Oct 30 16:22:33 2014 @@ -72,8 +72,13 @@ import org.apache.hadoop.mapreduce.TaskT import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy; import org.apache.tez.test.MiniTezCluster; import com.google.common.base.Joiner; @@ -85,6 +90,7 @@ import com.google.common.collect.Iterabl * Implemention of shims against Hadoop 0.23.0. */ public class Hadoop23Shims extends HadoopShimsSecure { + private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename"; HadoopShims.MiniDFSShim cluster = null; @@ -220,6 +226,30 @@ public class Hadoop23Shims extends Hadoo } /** + * Load the fair scheduler queue for given user if available. + */ + @Override + public void refreshDefaultQueue(Configuration conf, String userName) throws IOException { + String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME; + if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) { + AllocationConfiguration allocConf = new AllocationConfiguration(conf); + QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy(); + if (queuePolicy != null) { + requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName); + if (StringUtils.isNotBlank(requestedQueue)) { + LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName); + conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue); + } + } + } + } + + private boolean isFairScheduler (Configuration conf) { + return FairScheduler.class.getName(). + equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); + } + + /** * Returns a shim to wrap MiniMrCluster */ @Override @@ -847,4 +877,44 @@ public class Hadoop23Shims extends Hadoo TrashPolicy tp = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory()); return tp.getCurrentTrashDir(); } + + /** + * Returns a shim to wrap KerberosName + */ + @Override + public KerberosNameShim getKerberosNameShim(String name) throws IOException { + return new KerberosNameShim(name); + } + + /** + * Shim for KerberosName + */ + public class KerberosNameShim implements HadoopShimsSecure.KerberosNameShim { + + private KerberosName kerberosName; + + public KerberosNameShim(String name) { + kerberosName = new KerberosName(name); + } + + public String getDefaultRealm() { + return kerberosName.getDefaultRealm(); + } + + public String getServiceName() { + return kerberosName.getServiceName(); + } + + public String getHostName() { + return kerberosName.getHostName(); + } + + public String getRealm() { + return kerberosName.getRealm(); + } + + public String getShortName() throws IOException { + return kerberosName.getShortName(); + } + } } Modified: hive/branches/spark/shims/aggregator/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/aggregator/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/aggregator/pom.xml (original) +++ hive/branches/spark/shims/aggregator/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> Modified: hive/branches/spark/shims/common-secure/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/pom.xml (original) +++ hive/branches/spark/shims/common-secure/pom.xml Thu Oct 30 16:22:33 2014 @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>0.14.0-SNAPSHOT</version> + <version>0.15.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> @@ -74,6 +74,11 @@ <version>${libthrift.version}</version> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Thu Oct 30 16:22:33 2014 @@ -29,11 +29,14 @@ import java.security.PrivilegedException import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import javax.security.auth.login.LoginException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -66,6 +69,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -73,6 +77,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.tools.HadoopArchives; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.client.ZooKeeperSaslClient; import com.google.common.primitives.Longs; @@ -88,6 +93,7 @@ public abstract class HadoopShimsSecure return HtmlQuoting.unquoteHtmlChars(item); } + @Override public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() { return new CombineFileInputFormatShim() { @Override @@ -171,6 +177,7 @@ public abstract class HadoopShimsSecure protected boolean isShrinked; protected long shrinkedLength; + @Override public boolean next(K key, V value) throws IOException { while ((curReader == null) @@ -183,11 +190,13 @@ public abstract class HadoopShimsSecure return true; } + @Override public K createKey() { K newKey = curReader.createKey(); return (K)(new CombineHiveKey(newKey)); } + @Override public V createValue() { return curReader.createValue(); } @@ -195,10 +204,12 @@ public abstract class HadoopShimsSecure /** * Return the amount of data processed. */ + @Override public long getPos() throws IOException { return progress; } + @Override public void close() throws IOException { if (curReader != null) { curReader.close(); @@ -209,6 +220,7 @@ public abstract class HadoopShimsSecure /** * Return progress based on the amount of data processed so far. */ + @Override public float getProgress() throws IOException { return Math.min(1.0f, progress / (float) (split.getLength())); } @@ -309,6 +321,7 @@ public abstract class HadoopShimsSecure CombineFileInputFormat<K, V> implements HadoopShims.CombineFileInputFormatShim<K, V> { + @Override public Path[] getInputPathsShim(JobConf conf) { try { return FileInputFormat.getInputPaths(conf); @@ -339,7 +352,7 @@ public abstract class HadoopShimsSecure super.setMaxSplitSize(minSize); } - InputSplit[] splits = (InputSplit[]) super.getSplits(job, numSplits); + InputSplit[] splits = super.getSplits(job, numSplits); ArrayList<InputSplitShim> inputSplitShims = new ArrayList<InputSplitShim>(); for (int pos = 0; pos < splits.length; pos++) { @@ -359,10 +372,12 @@ public abstract class HadoopShimsSecure return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]); } + @Override public InputSplitShim getInputSplitShim() throws IOException { return new InputSplitShim(); } + @Override public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split, Reporter reporter, Class<RecordReader<K, V>> rrClass) @@ -373,6 +388,7 @@ public abstract class HadoopShimsSecure } + @Override public String getInputFormatClassName() { return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"; } @@ -401,6 +417,7 @@ public abstract class HadoopShimsSecure * the archive as compared to the full path in case of earlier versions. * See this api in Hadoop20Shims for comparison. */ + @Override public URI getHarUri(URI original, URI base, URI originalBase) throws URISyntaxException { URI relative = originalBase.relativize(original); @@ -431,6 +448,7 @@ public abstract class HadoopShimsSecure public void abortTask(TaskAttemptContext taskContext) { } } + @Override public void prepareJobOutput(JobConf conf) { conf.setOutputCommitter(NullOutputCommitter.class); @@ -573,6 +591,17 @@ public abstract class HadoopShimsSecure return UserGroupInformation.loginUserFromKeytabAndReturnUGI(hostPrincipal, keytabFile); } + /** + * Convert Kerberos principal name pattern to valid Kerberos principal names. + * @param principal (principal name pattern) + * @return + * @throws IOException + */ + @Override + public String getResolvedPrincipal(String principal) throws IOException { + return SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); + } + @Override public String getTokenFileLocEnvName() { return UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; @@ -675,4 +704,58 @@ public abstract class HadoopShimsSecure throws IOException, AccessControlException, Exception { DefaultFileAccess.checkFileAccess(fs, stat, action); } + + @Override + public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = getResolvedPrincipal(principal); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { + this.loginContextName = hiveLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map<String, String> krbOptions = new HashMap<String, String>(); + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("principal", principal); + krbOptions.put("keyTab", keyTabFile); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } + } Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java Thu Oct 30 16:22:33 2014 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport; @@ -108,18 +109,17 @@ public class DBTokenStore implements Del return delTokenIdents; } - private Object hmsHandler; + private Object rawStore; @Override - public void setStore(Object hms) throws TokenStoreException { - hmsHandler = hms; + public void init(Object rawStore, ServerMode smode) throws TokenStoreException { + this.rawStore = rawStore; } private Object invokeOnRawStore(String methName, Object[] params, Class<?> ... paramTypes) throws TokenStoreException{ try { - Object rawStore = hmsHandler.getClass().getMethod("getMS").invoke(hmsHandler); return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params); } catch (IllegalArgumentException e) { throw new TokenStoreException(e); @@ -149,5 +149,4 @@ public class DBTokenStore implements Del // No-op. } - } Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Thu Oct 30 16:22:33 2014 @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.List; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -108,6 +109,10 @@ public interface DelegationTokenStore ex */ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException; - void setStore(Object hmsHandler) throws TokenStoreException; + /** + * @param hmsHandler ObjectStore used by DBTokenStore + * @param smode Indicate whether this is a metastore or hiveserver2 token store + */ + void init(Object hmsHandler, ServerMode smode); } Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Thu Oct 30 16:22:33 2014 @@ -308,6 +308,10 @@ public class HadoopThriftAuthBridge20S e "hive.cluster.delegation.token.store.class"; public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = "hive.cluster.delegation.token.store.zookeeper.connectString"; + // alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = @@ -315,7 +319,7 @@ public class HadoopThriftAuthBridge20S e public static final String DELEGATION_TOKEN_STORE_ZK_ACL = "hive.cluster.delegation.token.store.zookeeper.acl"; public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = - "/hive/cluster/delegation"; + "/hivedelegation"; public Server() throws TTransportException { try { @@ -417,7 +421,7 @@ public class HadoopThriftAuthBridge20S e } @Override - public void startDelegationTokenSecretManager(Configuration conf, Object hms) + public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode) throws IOException{ long secretKeyInterval = conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, @@ -430,7 +434,7 @@ public class HadoopThriftAuthBridge20S e DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); DelegationTokenStore dts = getTokenStore(conf); - dts.setStore(hms); + dts.init(rawStore, smode); secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Thu Oct 30 16:22:33 2014 @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -108,8 +109,7 @@ public class MemoryTokenStore implements } @Override - public void setStore(Object hmsHandler) throws TokenStoreException { + public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { // no-op } - } Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (original) +++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Thu Oct 30 16:22:33 2014 @@ -265,6 +265,7 @@ public class TokenStoreDelegationTokenSe /** * Extension of rollMasterKey to remove expired keys from store. + * * @throws IOException */ protected void rollMasterKeyExt() throws IOException { @@ -273,18 +274,21 @@ public class TokenStoreDelegationTokenSe HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this); List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys()); for (DelegationKey key : keysAfterRoll) { - keys.remove(key.getKeyId()); - if (key.getKeyId() == currentKeyId) { - tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); - } + keys.remove(key.getKeyId()); + if (key.getKeyId() == currentKeyId) { + tokenStore.updateMasterKey(currentKeyId, encodeWritable(key)); + } } for (DelegationKey expiredKey : keys.values()) { LOGGER.info("Removing expired key id={}", expiredKey.getKeyId()); - tokenStore.removeMasterKey(expiredKey.getKeyId()); + try { + tokenStore.removeMasterKey(expiredKey.getKeyId()); + } catch (Exception e) { + LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e); + } } } - /** * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access * restriction (there would not be an need to clone the remove thread if the remove logic was