This is an automated email from the ASF dual-hosted git repository. ychena pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b7da71856b HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi) b7da71856b is described below commit b7da71856b1bb51af68a5ba6890b65f9843f3606 Author: Sourabh Goyal <soura...@cloudera.com> AuthorDate: Wed Apr 20 06:33:37 2022 -0700 HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi) * [WIP]HIVE-21456: Thrift over Http for Hive Metastore Change-Id: Ie610b7351fe6279353c1f781b0602da0f1860443 * Addresses review comments. Also fixes build failure Change-Id: Idc8dc3448156e7e2715dc9ea979edf007d4d53d4 * fixes test failures Change-Id: Ibf6210985248f88cf7011b048703e95fd99dee49 * Refactors creation of Binary and HTTP clients in seprate methods in HiveMetastoreClient Change-Id: Ib080e24eede76104e10458343f85ac746022f16d * Addresses review comments Change-Id: I5ec4fb201bd65bc358c38160348b200fc16d730c * Fixes validation of maxIdleTimeout in metastore http server Change-Id: I52990b3904cd8d42da9cff9f282e1f099323e3d7 * Disabled HTTP TRACE in embedded jetty server in HMS Change-Id: Idcdec4ee0ff7d3ded67816cca4505627a1e5b33b * Addresses review comments Change-Id: Ie046c512f2095b0d71743a9485620e369dc75b17 * Addresses nits. Adds some more comments Change-Id: I39b50cd549af62e5d460fa99167c5aab221edaf8 --- .../java/org/hadoop/hive/jdbc/SSLTestUtils.java | 6 + .../test/java/org/apache/hive/jdbc/TestSSL.java | 24 +- .../hadoop/hive/metastore/HiveMetaStoreClient.java | 352 +++++++++++++-------- .../hadoop/hive/metastore/conf/MetastoreConf.java | 27 ++ .../hive/metastore/utils/MetaStoreUtils.java | 25 ++ .../hadoop/hive/metastore/utils/SecurityUtils.java | 46 +++ .../src/test/resources/log4j2.properties | 11 +- .../hadoop/hive/metastore/HiveMetaStore.java | 285 +++++++++++++++-- .../hive/metastore/HmsThriftHttpServlet.java | 113 +++++++ ...Store.java => TestRemoteHiveHttpMetaStore.java} | 49 +-- .../hive/metastore/TestRemoteHiveMetaStore.java | 3 + standalone-metastore/pom.xml | 6 + 12 files changed, 752 insertions(+), 195 deletions(-) diff --git a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java index b8e7e3de65..3917a3b457 100644 --- a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java +++ b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java @@ -67,6 +67,12 @@ public class SSLTestUtils { KEY_STORE_TRUST_STORE_PASSWORD); } + public static void setMetastoreHttpsConf(HiveConf conf) { + setMetastoreSslConf(conf); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.THRIFT_TRANSPORT_MODE, "http"); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http"); + } + public static void clearSslConfOverlay(Map<String, String> confOverlay) { confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, "false"); } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java index 1d170ec309..ec6c65f75a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java @@ -53,7 +53,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@org.junit.Ignore("HIVE-22620") public class TestSSL { private static final Logger LOG = LoggerFactory.getLogger(TestSSL.class); @@ -65,6 +64,7 @@ public class TestSSL { private static final String JAVA_TRUST_STORE_PROP = "javax.net.ssl.trustStore"; private static final String JAVA_TRUST_STORE_PASS_PROP = "javax.net.ssl.trustStorePassword"; private static final String JAVA_TRUST_STORE_TYPE_PROP = "javax.net.ssl.trustStoreType"; + private static final String KEY_MANAGER_FACTORY_ALGORITHM = "SunX509"; private MiniHS2 miniHS2 = null; private static HiveConf conf = new HiveConf(); @@ -290,6 +290,7 @@ public class TestSSL { * Test SSL client connection to SSL server * @throws Exception */ + @Ignore @Test public void testSSLConnectionWithProperty() throws Exception { SSLTestUtils.setSslConfOverlay(confOverlay); @@ -390,6 +391,7 @@ public class TestSSL { * Opening a new connection with this wrong certificate should fail * @throws Exception */ + @Ignore @Test public void testConnectionWrongCertCN() throws Exception { // This call sets the default ssl params including the correct keystore in the server config @@ -437,15 +439,34 @@ public class TestSSL { * Test HMS server with SSL * @throws Exception */ + @Ignore @Test public void testMetastoreWithSSL() throws Exception { testSSLHMS(true); } + /** + * Test HMS server with Thrift over Http + SSL + * @throws Exception + */ + @Test + public void testMetastoreWithHttps() throws Exception { + SSLTestUtils.setMetastoreHttpsConf(conf); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM, + KEY_MANAGER_FACTORY_ALGORITHM); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_TYPE, KEY_STORE_TRUST_STORE_TYPE); + // false flag in testSSLHMS will set key store type for metastore + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM, + KEY_MANAGER_FACTORY_ALGORITHM); + + testSSLHMS(false); + } + /** * Test HMS server with SSL with input keystore type * @throws Exception */ + @Ignore @Test public void testMetastoreWithSSLKeyStoreType() throws Exception { testSSLHMS(false); @@ -511,6 +532,7 @@ public class TestSSL { * Test SSL client connection to SSL server * @throws Exception */ + @Ignore @Test public void testSSLConnectionWithKeystoreType() throws Exception { SSLTestUtils.setSslConfOverlay(confOverlay); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 81f6083ce4..f1d06d1d34 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -78,12 +78,18 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.protocol.HttpContext; import org.apache.thrift.TApplicationException; import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.THttpClient; import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; @@ -594,6 +600,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { client.rename_partition_req(req); } + private THttpClient createHttpClient(URI store, boolean useSSL) throws MetaException, + TTransportException { + String path = MetaStoreUtils.getHttpPath(MetastoreConf.getVar(conf, ConfVars.THRIFT_HTTP_PATH)); + String httpUrl = (useSSL ? "https://" : "http://") + store.getHost() + ":" + store.getPort() + path; + + String user = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME); + if (user == null || user.equals("")) { + try { + LOG.debug("No username passed in config " + ConfVars.METASTORE_CLIENT_PLAIN_USERNAME.getHiveName() + + ". Trying to get the current user from UGI" ); + user = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + throw new MetaException("Failed to get client username from UGI"); + } + } + final String httpUser = user; + THttpClient tHttpClient; + HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); + httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() { + @Override + public void process(HttpRequest httpRequest, HttpContext httpContext) + throws HttpException, IOException { + httpRequest.addHeader(MetaStoreUtils.USER_NAME_HTTP_HEADER, httpUser); + } + }); + + try { + if (useSSL) { + String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim(); + if (trustStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH + + " Not configured for SSL connection"); + } + String trustStorePassword = + MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD); + String trustStoreType = + MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim(); + String trustStoreAlgorithm = + MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim(); + tHttpClient = SecurityUtils.getThriftHttpsClient(httpUrl, trustStorePath, trustStorePassword, + trustStoreAlgorithm, trustStoreType, httpClientBuilder); + } else { + tHttpClient = new THttpClient(httpUrl, httpClientBuilder.build()); + } + } catch (Exception e) { + if (e instanceof TTransportException) { + throw (TTransportException)e; + } else { + throw new MetaException("Failed to create http transport client to url: " + httpUrl + + ". Error:" + e); + } + } + LOG.debug("Created thrift http client for URL: " + httpUrl); + return tHttpClient; + } + + private TTransport createBinaryClient(URI store, boolean useSSL) throws TTransportException, + MetaException { + TTransport binaryTransport = null; + try { + int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf, + ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); + if (useSSL) { + String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim(); + if (trustStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH + + " Not configured for SSL connection"); + } + String trustStorePassword = + MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD); + String trustStoreType = + MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim(); + String trustStoreAlgorithm = + MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim(); + binaryTransport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, + trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm); + } else { + binaryTransport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), + clientSocketTimeout); + } + binaryTransport = createAuthBinaryTransport(store, binaryTransport); + } catch (Exception e) { + if (e instanceof TTransportException) { + throw (TTransportException)e; + } else { + throw new MetaException("Failed to create binary transport client to url: " + store + + ". Error: " + e); + } + } + LOG.debug("Created thrift binary client for URI: " + store); + return binaryTransport; + } + private void open() throws MetaException { isConnected = false; TTransportException tte = null; @@ -602,10 +701,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL); String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE); boolean usePasswordAuth = false; - boolean useFramedTransport = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT); boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL); - int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf, - ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); + String transportMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE); + boolean isHttpTransportMode = transportMode.equalsIgnoreCase("http"); if (clientAuthMode != null) { usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode); @@ -613,118 +711,18 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { - LOG.info("Trying to connect to metastore with URI ({})", store); - + LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store, + transportMode); try { - if (useSSL) { - try { - String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim(); - if (trustStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH - + " Not configured for SSL connection"); - } - String trustStorePassword = - MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD); - String trustStoreType = - MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim(); - String trustStoreAlgorithm = - MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim(); - - // Create an SSL socket and connect - transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, - trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm); - final int newCount = connCount.incrementAndGet(); - LOG.debug( - "Opened an SSL connection to metastore, current connections: {}", - newCount); - if (LOG.isTraceEnabled()) { - LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]", - System.identityHashCode(this), new Exception()); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); - } catch (TTransportException e) { - tte = e; - throw new MetaException(e.toString()); - } - } else { - try { - transport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), clientSocketTimeout); - } catch (TTransportException e) { - tte = e; - throw new MetaException(e.toString()); - } - } - - if (usePasswordAuth) { - // we are using PLAIN Sasl connection with user/password - LOG.debug("HMSC::open(): Creating plain authentication thrift connection."); - String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME); - - if (null == userName || userName.isEmpty()) { - throw new MetaException("No user specified for plain transport."); - } - - // The password is not directly provided. It should be obtained from a keystore pointed - // by configuration "hadoop.security.credential.provider.path". - try { - String passwd = null; - char[] pwdCharArray = conf.getPassword(userName); - if (null != pwdCharArray) { - passwd = new String(pwdCharArray); - } - if (null == passwd) { - throw new MetaException("No password found for user " + userName); - } - // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL) - transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, transport); - } catch (IOException | TTransportException sasle) { - // IOException covers SaslException - LOG.error("Could not create client transport", sasle); - throw new MetaException(sasle.toString()); - } - } else if (useSasl) { - // Wrap thrift connection with SASL for secure connection. - try { - HadoopThriftAuthBridge.Client authBridge = - HadoopThriftAuthBridge.getBridge().createClient(); - - // check if we should use delegation tokens to authenticate - // the call below gets hold of the tokens if they are set up by hadoop - // this should happen on the map/reduce tasks if the client added the - // tokens into hadoop's credential store in the front end during job - // submission. - String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE); - // tokenSig could be null - tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig); - - if (tokenStrForm != null) { - LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection."); - // authenticate using delegation tokens via the "DIGEST" mechanism - transport = authBridge.createClientTransport(null, store.getHost(), - "DIGEST", tokenStrForm, transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); - } else { - LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection."); - String principalConfig = - MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL); - transport = authBridge.createClientTransport( - principalConfig, store.getHost(), "KERBEROS", null, - transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); - } - } catch (IOException ioe) { - LOG.error("Failed to create client transport", ioe); - throw new MetaException(ioe.toString()); - } - } else { - if (useFramedTransport) { - try { - transport = new TFramedTransport(transport); - } catch (TTransportException e) { - LOG.error("Failed to create client transport", e); - throw new MetaException(e.toString()); - } + try { + if (isHttpTransportMode) { + transport = createHttpClient(store, useSSL); + } else { + transport = createBinaryClient(store, useSSL); } + } catch (TTransportException te) { + tte = te; + throw new MetaException(te.toString()); } final TProtocol protocol; @@ -738,23 +736,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { if (!transport.isOpen()) { transport.open(); final int newCount = connCount.incrementAndGet(); - LOG.info("Opened a connection to metastore, URI ({}) " - + "current connections: {}", store, newCount); - if (LOG.isTraceEnabled()) { - LOG.trace("METASTORE CONNECTION TRACE - open [{}]", - System.identityHashCode(this), new Exception()); + if (useSSL) { + LOG.info( + "Opened an SSL connection to metastore, current connections: {}", + newCount); + if (LOG.isTraceEnabled()) { + LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]", + System.identityHashCode(this), new Exception()); + } + } else { + LOG.info("Opened a connection to metastore, URI ({}) " + + "current connections: {}", store, newCount); + if (LOG.isTraceEnabled()) { + LOG.trace("METASTORE CONNECTION TRACE - open [{}]", + System.identityHashCode(this), new Exception()); + } } } isConnected = true; } catch (TTransportException e) { tte = e; - LOG.warn("Failed to connect to the MetaStore Server URI ({})", - store); - LOG.debug("Failed to connect to the MetaStore Server URI ({})", - store, e); + String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s " + + "transport mode", store, transportMode); + LOG.warn(errMsg); + LOG.debug(errMsg, e); } - if (isConnected && !useSasl && !usePasswordAuth && + if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode && MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) { // Call set_ugi, only in unsecure mode. try { @@ -773,8 +781,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } } catch (MetaException e) { recentME = e; - LOG.error("Failed to connect to metastore with URI (" + store - + ") in attempt " + attempt, e); + String errMsg = "Failed to connect to metastore with URI (" + store + + ") transport mode:" + transportMode + " in attempt " + attempt; + LOG.error(errMsg, e); } if (isConnected) { break; @@ -806,6 +815,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { snapshotActiveConf(); } + // wraps the underlyingTransport in the appropriate transport based on mode of authentication + private TTransport createAuthBinaryTransport(URI store, TTransport underlyingTransport) + throws MetaException { + boolean isHttpTransportMode = + MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE). + equalsIgnoreCase("http"); + Preconditions.checkArgument(!isHttpTransportMode); + Preconditions.checkNotNull(underlyingTransport, "Underlying transport should not be null"); + // default transport is the underlying one + TTransport transport = underlyingTransport; + boolean useFramedTransport = + MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT); + boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL); + boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL); + String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE); + boolean usePasswordAuth = false; + + if (clientAuthMode != null) { + usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode); + } + if (usePasswordAuth) { + // we are using PLAIN Sasl connection with user/password + LOG.debug("HMSC::open(): Creating plain authentication thrift connection."); + String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME); + + if (null == userName || userName.isEmpty()) { + throw new MetaException("No user specified for plain transport."); + } + + // The password is not directly provided. It should be obtained from a keystore pointed + // by configuration "hadoop.security.credential.provider.path". + try { + String passwd = null; + char[] pwdCharArray = conf.getPassword(userName); + if (null != pwdCharArray) { + passwd = new String(pwdCharArray); + } + if (null == passwd) { + throw new MetaException("No password found for user " + userName); + } + // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL) + transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, underlyingTransport); + } catch (IOException | TTransportException sasle) { + // IOException covers SaslException + LOG.error("Could not create client transport", sasle); + throw new MetaException(sasle.toString()); + } + } else if (useSasl) { + // Wrap thrift connection with SASL for secure connection. + try { + HadoopThriftAuthBridge.Client authBridge = + HadoopThriftAuthBridge.getBridge().createClient(); + + // check if we should use delegation tokens to authenticate + // the call below gets hold of the tokens if they are set up by hadoop + // this should happen on the map/reduce tasks if the client added the + // tokens into hadoop's credential store in the front end during job + // submission. + String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE); + // tokenSig could be null + tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig); + + if (tokenStrForm != null) { + LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection."); + // authenticate using delegation tokens via the "DIGEST" mechanism + transport = authBridge.createClientTransport(null, store.getHost(), + "DIGEST", tokenStrForm, underlyingTransport, + MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); + } else { + LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection."); + String principalConfig = + MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL); + transport = authBridge.createClientTransport( + principalConfig, store.getHost(), "KERBEROS", null, + underlyingTransport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL)); + } + } catch (IOException ioe) { + LOG.error("Failed to create client transport", ioe); + throw new MetaException(ioe.toString()); + } + } else { + if (useFramedTransport) { + try { + transport = new TFramedTransport(transport); + } catch (TTransportException e) { + LOG.error("Failed to create client transport", e); + throw new MetaException(e.toString()); + } + } + } + return transport; + } + private void snapshotActiveConf() { currentMetaVars = new HashMap<>(MetastoreConf.metaVars.length); for (ConfVars oneVar : MetastoreConf.metaVars) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 9cc8d04ceb..51a46e8d42 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1364,6 +1364,13 @@ public class MetastoreConf { "Comma-separated list of tasks that will be started in separate threads. These will be" + " started only when the metastore is running as a separate service. They must " + "implement " + METASTORE_TASK_THREAD_CLASS), + THRIFT_TRANSPORT_MODE("metastore.server.thrift.transport.mode", + "hive.metastore.server.thrift.transport.mode", "binary", + "Transport mode for thrift server in Metastore. Can be binary or http"), + THRIFT_HTTP_PATH("metastore.server.thrift.http.path", + "hive.metastore.server.thrift.http.path", + "metastore", + "Path component of URL endpoint when in HTTP mode"), TCP_KEEP_ALIVE("metastore.server.tcp.keepalive", "hive.metastore.server.tcp.keepalive", true, "Whether to enable TCP keepalive for the metastore server. Keepalive will prevent accumulation of half-open connections."), @@ -1504,6 +1511,26 @@ public class MetastoreConf { USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false, "Comma separated list of users who are in admin role for bootstrapping.\n" + "More users can be added in ADMIN role later."), + // TODO: Should we have a separate config for the metastoreclient or THRIFT_TRANSPORT_MODE + // would suffice ? + METASTORE_CLIENT_THRIFT_TRANSPORT_MODE("metastore.client.thrift.transport.mode", + "hive.metastore.client.thrift.transport.mode", "binary", + "Transport mode to be used by the metastore client. It should be the same as " + THRIFT_TRANSPORT_MODE), + METASTORE_CLIENT_THRIFT_HTTP_PATH("metastore.client.thrift.http.path", + "hive.metastore.client.thrift.http.path", + "metastore", + "Path component of URL endpoint when in HTTP mode"), + METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE("metastore.server.thrift.http.request.header.size", + "hive.metastore.server.thrift.http.request.header.size", 6*1024, + "Request header size in bytes when using HTTP transport mode for metastore thrift server." + + " Defaults to jetty's defaults"), + METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE("metastore.server.thrift.http.response.header.size", + "metastore.server.thrift.http.response.header.size", 6*1024, + "Response header size in bytes when using HTTP transport mode for metastore thrift server." + + " Defaults to jetty's defaults"), + METASTORE_THRIFT_HTTP_MAX_IDLE_TIME("metastore.thrift.http.max.idle.time", "hive.metastore.thrift.http.max.idle.time", + 1800, TimeUnit.SECONDS, + "Maximum idle time for a connection on the server when in HTTP mode."), USE_SSL("metastore.use.SSL", "hive.metastore.use.SSL", false, "Set this to true for using SSL encryption in HMS server."), // We should somehow unify next two options. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 8854430f7a..d4bcb5b5e9 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -128,6 +128,8 @@ public class MetaStoreUtils { public static final String NO_VAL = " --- "; + public static final String USER_NAME_HTTP_HEADER = "x-actor-username"; + /** * Catches exceptions that cannot be handled and wraps them in MetaException. * @@ -1157,4 +1159,27 @@ public class MetaStoreUtils { } return result; } + + /** + * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on. + * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*" + * @param httpPath + * @return + */ + public static String getHttpPath(String httpPath) { + if (httpPath == null || httpPath.equals("")) { + httpPath = "/*"; + } else { + if (!httpPath.startsWith("/")) { + httpPath = "/" + httpPath; + } + if (httpPath.endsWith("/")) { + httpPath = httpPath + "*"; + } + if (!httpPath.endsWith("/*")) { + httpPath = httpPath + "/*"; + } + } + return httpPath; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java index 2b326b2922..cb5b170808 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hive.metastore.utils; +import com.google.common.base.Preconditions; +import java.io.FileInputStream; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import javax.net.ssl.SSLContext; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier; import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector; @@ -27,12 +34,21 @@ import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; +import org.apache.thrift.transport.THttpClient; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.ssl.SSLContexts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,6 +287,36 @@ public class SecurityUtils { return getSSLSocketWithHttps(tSSLSocket); } + /* + Sets the ssl related configs in the underlying http client builder and wrap it up + in a THttpClient + */ + public static THttpClient getThriftHttpsClient(String httpsUrl, String trustStorePath, + String trustStorePasswd, String trustStoreAlgorithm, String trustStoreType, + HttpClientBuilder underlyingHttpClientBuilder) throws TTransportException, IOException, + KeyStoreException, NoSuchAlgorithmException, CertificateException, + KeyManagementException { + Preconditions.checkNotNull(underlyingHttpClientBuilder, "httpClientBuilder should not be null"); + if (trustStoreType == null || trustStoreType.isEmpty()) { + trustStoreType = KeyStore.getDefaultType(); + } + KeyStore sslTrustStore = KeyStore.getInstance(trustStoreType); + try (FileInputStream fis = new FileInputStream(trustStorePath)) { + sslTrustStore.load(fis, trustStorePasswd.toCharArray()); + } + + SSLContext sslContext = + SSLContexts.custom().setTrustManagerFactoryAlgorithm(trustStoreAlgorithm). + loadTrustMaterial(sslTrustStore, null).build(); + SSLConnectionSocketFactory socketFactory = + new SSLConnectionSocketFactory(sslContext, new DefaultHostnameVerifier(null)); + final Registry<ConnectionSocketFactory> registry = + RegistryBuilder.<ConnectionSocketFactory> create().register("https", socketFactory) + .build(); + underlyingHttpClientBuilder.setConnectionManager(new BasicHttpClientConnectionManager(registry)); + return new THttpClient(httpsUrl, underlyingHttpClientBuilder.build()); + } + // Using endpoint identification algorithm as HTTPS enables us to do // CNAMEs/subjectAltName verification private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException { diff --git a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties index 32f9b38404..bd4847c51f 100644 --- a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties +++ b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties @@ -22,8 +22,17 @@ appenders = console appender.console.type = Console appender.console.name = STDOUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n +appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{5} - %msg%n rootLogger.level = debug rootLogger.appenderRefs = stdout rootLogger.appenderRef.stdout.ref = STDOUT + +loggers = HttpClient, JettyHttpServer + +logger.HttpClient.name = org.apache.http.client +logger.HttpClient.level = INFO + +logger.JettyHttpServer.name = org.eclipse.jetty.server +logger.JettyHttpServer.level = INFO + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 6831e225a6..cf0b1ec8c1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.metastore; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ZKDeRegisterWatcher; @@ -35,6 +38,7 @@ import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager import org.apache.hadoop.hive.metastore.utils.CommonCliOptions; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.LogUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.SecurityUtil; @@ -51,10 +55,24 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.server.TServlet; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportFactory; + +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +85,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -98,6 +114,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { private static ZooKeeperHiveHelper zooKeeperHelper = null; private static String msHost = null; + private static ThriftServer thriftServer; public static boolean isRenameAllowed(Database srcDB, Database destDB) { if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) { @@ -195,7 +212,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { .create('p')); } - @Override public void parse(String[] args) { super.parse(args); @@ -230,6 +246,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + /* + Interface to encapsulate Http and binary thrift server for + HiveMetastore + */ + private interface ThriftServer { + public void start() throws Throwable; + public boolean isRunning(); + } + /** * @param args */ @@ -346,21 +371,156 @@ public class HiveMetaStore extends ThriftHiveMetastore { startMetaStore(port, bridge, conf, false, null); } - /** - * Start Metastore based on a passed {@link HadoopThriftAuthBridge}. - * - * @param port The port on which the Thrift server will start to serve - * @param bridge - * @param conf Configuration overrides - * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.) - * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true, - * when all of the background threads are scheduled. Useful for testing purposes to wait - * until the MetaStore is fully initialized. - * @throws Throwable - */ - public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, - Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable { - isMetaStoreRemote = true; + // TODO: Is it worth trying to use a server that supports HTTP/2? + // Does the Thrift http client support this? + + private static ThriftServer startHttpMetastore(int port, Configuration conf) + throws Exception { + LOG.info("Attempting to start http metastore server on port: {}", port); + + // This check is likely pointless, especially with the current state of the http + // servlet which respects whatever comes in. Putting this in place for the moment + // only to enable testing on an otherwise secure cluster. + LOG.info(" Checking if security is enabled"); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Logging in via keytab while starting HTTP metastore"); + // Handle renewal + String kerberosName = SecurityUtil.getServerPrincipal(MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL), "0.0.0.0"); + String keyTabFile = MetastoreConf.getVar(conf, ConfVars.KERBEROS_KEYTAB_FILE); + UserGroupInformation.loginUserFromKeytab(kerberosName, keyTabFile); + } else { + LOG.info("Security is not enabled. Not logging in via keytab"); + } + + long maxMessageSize = MetastoreConf.getLongVar(conf, ConfVars.SERVER_MAX_MESSAGE_SIZE); + int minWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MIN_THREADS); + int maxWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MAX_THREADS); + // Server thread pool + // Start with minWorkerThreads, expand till maxWorkerThreads and reject + // subsequent requests + final String threadPoolNamePrefix = "HiveMetastore-HttpHandler-Pool"; + ExecutorService executorService = new ThreadPoolExecutor( + minWorkerThreads, maxWorkerThreads, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), new ThreadFactory() { + @Override + public Thread newThread(@NotNull Runnable r) { + Thread newThread = new Thread(r); + newThread.setName(threadPoolNamePrefix + ": Thread-" + newThread.getId()); + return newThread; + } + }); + ExecutorThreadPool threadPool = new ExecutorThreadPool((ThreadPoolExecutor) executorService); + // HTTP Server + org.eclipse.jetty.server.Server server = new Server(threadPool); + server.setStopAtShutdown(true); + + ServerConnector connector; + final HttpConfiguration httpServerConf = new HttpConfiguration(); + httpServerConf.setRequestHeaderSize( + MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE)); + httpServerConf.setResponseHeaderSize( + MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE)); + + final HttpConnectionFactory http = new HttpConnectionFactory(httpServerConf); + + final boolean useSsl = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL); + String schemeName = useSsl ? "https" : "http"; + if (useSsl) { + String keyStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_PATH).trim(); + if (keyStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.SSL_KEYSTORE_PATH.toString() + + " Not configured for SSL connection"); + } + String keyStorePassword = + MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD); + String keyStoreType = + MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim(); + String keyStoreAlgorithm = + MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim(); + + SslContextFactory sslContextFactory = new SslContextFactory(); + String[] excludedProtocols = MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(","); + LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols)); + sslContextFactory.addExcludeProtocols(excludedProtocols); + LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " + + Arrays.toString(sslContextFactory.getExcludeProtocols())); + sslContextFactory.setKeyStorePath(keyStorePath); + sslContextFactory.setKeyStorePassword(keyStorePassword); + sslContextFactory.setKeyStoreType(keyStoreType); + sslContextFactory.setKeyManagerFactoryAlgorithm(keyStoreAlgorithm); + connector = new ServerConnector(server, sslContextFactory, http); + } else { + connector = new ServerConnector(server, http); + } + connector.setPort(port); + connector.setReuseAddress(true); + // TODO: What should the idle timeout be for the metastore? Currently it is 30 minutes + long maxIdleTimeout = MetastoreConf.getTimeVar(conf, ConfVars.METASTORE_THRIFT_HTTP_MAX_IDLE_TIME, + TimeUnit.MILLISECONDS); + connector.setIdleTimeout(maxIdleTimeout); + // TODO: AcceptQueueSize needs to be higher for HMS + connector.setAcceptQueueSize(maxWorkerThreads); + // TODO: Connection keepalive configuration? + + server.addConnector(connector); + TProcessor processor; + boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL); + final TProtocolFactory protocolFactory; + if (useCompactProtocol) { + protocolFactory = new TCompactProtocol.Factory(); + } else { + protocolFactory = new TBinaryProtocol.Factory(); + } + + HMSHandler baseHandler = new HMSHandler("new db based metaserver", + conf); + IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); + processor = new ThriftHiveMetastore.Processor<>(handler); + LOG.info("Starting DB backed MetaStore Server with generic processor"); + TServlet thriftHttpServlet = new HmsThriftHttpServlet(processor, protocolFactory); + + boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL); + HMSHandler.LOG.info("Direct SQL optimization = {}", directSqlEnabled); + + String httpPath = + MetaStoreUtils.getHttpPath( + MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_HTTP_PATH)); + + ServletContextHandler context = new ServletContextHandler( + ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); + + // Tons of stuff skipped as compared the HS2. + // Sesions, XSRF, Compression, path configuration, etc. + constraintHttpMethods(context, false); + server.setHandler(context); + + context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); + + + return new ThriftServer() { + @Override + public void start() throws Throwable { + HMSHandler.LOG.debug("Starting HTTPServer for HMS"); + server.setStopAtShutdown(true); + server.start(); + HMSHandler.LOG.info("Started the new HTTPServer for metastore on port [" + port + + "]..."); + HMSHandler.LOG.info("Options.minWorkerThreads = " + + minWorkerThreads); + HMSHandler.LOG.info("Options.maxWorkerThreads = " + + maxWorkerThreads); + HMSHandler.LOG.info("Enable SSL = " + useSsl); + } + + @Override + public boolean isRunning() { + return server != null && server.isRunning(); + } + }; + } + + private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridge bridge, + Configuration conf) throws Throwable { // Server will create new threads up to max as necessary. After an idle // period, it will destroy threads to keep the number of threads in the // pool to min. @@ -396,12 +556,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { inputProtoFactory = new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize); } IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); - TServerSocket serverSocket; - if (useSasl) { processor = saslServer.wrapProcessor( - new ThriftHiveMetastore.Processor<>(handler)); + new ThriftHiveMetastore.Processor<>(handler)); LOG.info("Starting DB backed MetaStore Server in Secure Mode"); } else { // we are in unsecure mode. @@ -430,9 +588,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { String keyStorePassword = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD); String keyStoreType = - MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim(); + MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim(); String keyStoreAlgorithm = - MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim(); + MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim(); // enable SSL support for HMS List<String> sslVersionBlacklist = new ArrayList<>(); for (String sslVersion : MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",")) { @@ -488,14 +646,71 @@ public class HiveMetaStore extends ThriftHiveMetastore { }; tServer.setServerEventHandler(tServerEventHandler); - LOG.info("Started the new metaserver on port [" + port - + "]..."); - LOG.info("Options.minWorkerThreads = " - + minWorkerThreads); - LOG.info("Options.maxWorkerThreads = " - + maxWorkerThreads); - LOG.info("TCP keepalive = " + tcpKeepAlive); - LOG.info("Enable SSL = " + useSSL); + return new ThriftServer() { + @Override + public void start() throws Throwable { + tServer.serve(); + HMSHandler.LOG.info("Started the new metaserver on port [" + port + + "]..."); + HMSHandler.LOG.info("Options.minWorkerThreads = " + + minWorkerThreads); + HMSHandler.LOG.info("Options.maxWorkerThreads = " + + maxWorkerThreads); + HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive); + HMSHandler.LOG.info("Enable SSL = " + useSSL); + } + + @Override + public boolean isRunning() { + return tServer != null && tServer.isServing(); + } + }; + } + + private static void constraintHttpMethods(ServletContextHandler ctxHandler, boolean allowOptionsMethod) { + Constraint c = new Constraint(); + c.setAuthenticate(true); + + ConstraintMapping cmt = new ConstraintMapping(); + cmt.setConstraint(c); + cmt.setMethod("TRACE"); + cmt.setPathSpec("/*"); + + ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + if (!allowOptionsMethod) { + ConstraintMapping cmo = new ConstraintMapping(); + cmo.setConstraint(c); + cmo.setMethod("OPTIONS"); + cmo.setPathSpec("/*"); + securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt, cmo}); + } else { + securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt}); + } + ctxHandler.setSecurityHandler(securityHandler); + } + /** + * Start Metastore based on a passed {@link HadoopThriftAuthBridge}. + * + * @param port The port on which the Thrift server will start to serve + * @param bridge + * @param conf Configuration overrides + * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.) + * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true, + * when all of the background threads are scheduled. Useful for testing purposes to wait + * until the MetaStore is fully initialized. + * @throws Throwable + */ + public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, + Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable { + isMetaStoreRemote = true; + String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary"); + boolean isHttpTransport = transportMode.equalsIgnoreCase("http"); + if (isHttpTransport) { + thriftServer = startHttpMetastore(port, conf); + } else { + thriftServer = startBinaryMetastore(port, bridge, conf); + } + logCompactionParameters(conf); boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL); @@ -507,7 +722,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { AtomicBoolean startedServing = new AtomicBoolean(); startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing, isMetastoreHousekeepingLeader(conf, getServerHostName()), startedBackgroundThreads); - signalOtherThreadsToStart(tServer, metaStoreThreadsLock, startCondition, startedServing); + signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, startCondition, startedServing); } // If dynamic service discovery through ZooKeeper is enabled, add this server to the ZooKeeper. @@ -526,7 +741,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - tServer.serve(); + thriftServer.start(); } private static void logCompactionParameters(Configuration conf) { @@ -594,7 +809,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } - private static void signalOtherThreadsToStart(final TServer server, final Lock startLock, + private static void signalOtherThreadsToStart(final ThriftServer thriftServer, final Lock startLock, final Condition startCondition, final AtomicBoolean startedServing) { // A simple thread to wait until the server has started and then signal the other threads to @@ -608,7 +823,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { } catch (InterruptedException e) { LOG.warn("Signalling thread was interrupted: " + e.getMessage()); } - } while (!server.isServing()); + } while (!thriftServer.isRunning()); startLock.lock(); try { startedServing.set(true); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java new file mode 100644 index 0000000000..e58bd5634b --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java @@ -0,0 +1,113 @@ +/* * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Enumeration; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TServlet; + +public class HmsThriftHttpServlet extends TServlet { + + private static final Logger LOG = LoggerFactory + .getLogger(HmsThriftHttpServlet.class); + + private static final String X_USER = MetaStoreUtils.USER_NAME_HTTP_HEADER; + + private final boolean isSecurityEnabled; + + public HmsThriftHttpServlet(TProcessor processor, + TProtocolFactory inProtocolFactory, TProtocolFactory outProtocolFactory) { + super(processor, inProtocolFactory, outProtocolFactory); + // This should ideally be reveiving an instance of the Configuration which is used for the check + isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + } + + public HmsThriftHttpServlet(TProcessor processor, + TProtocolFactory protocolFactory) { + super(processor, protocolFactory); + isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + } + + @Override + protected void doPost(HttpServletRequest request, + HttpServletResponse response) throws ServletException, IOException { + + Enumeration<String> headerNames = request.getHeaderNames(); + if (LOG.isDebugEnabled()) { + LOG.debug("Logging headers in request"); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + LOG.debug("Header: [{}], Value: [{}]", headerName, + request.getHeader(headerName)); + } + } + String userFromHeader = request.getHeader(X_USER); + if (userFromHeader == null || userFromHeader.isEmpty()) { + LOG.error("No user header: {} found", X_USER); + response.sendError(HttpServletResponse.SC_FORBIDDEN, + "Header: " + X_USER + " missing in the request"); + return; + } + + // TODO: These should ideally be in some kind of a Cache with Weak referencse. + // If HMS were to set up some kind of a session, this would go into the session by having + // this filter work with a custom Processor / or set the username into the session + // as is done for HS2. + // In case of HMS, it looks like each request is independent, and there is no session + // information, so the UGI needs to be set up in the Connection layer itself. + UserGroupInformation clientUgi; + // Temporary, and useless for now. Here only to allow this to work on an otherwise kerberized + // server. + if (isSecurityEnabled) { + LOG.info("Creating proxy user for: {}", userFromHeader); + clientUgi = UserGroupInformation.createProxyUser(userFromHeader, UserGroupInformation.getLoginUser()); + } else { + LOG.info("Creating remote user for: {}", userFromHeader); + clientUgi = UserGroupInformation.createRemoteUser(userFromHeader); + } + + + PrivilegedExceptionAction<Void> action = new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + HmsThriftHttpServlet.super.doPost(request, response); + return null; + } + }; + + try { + clientUgi.doAs(action); + } catch (InterruptedException | RuntimeException e) { + LOG.error("Exception when executing http request as user: " + clientUgi.getUserName(), + e); + throw new ServletException(e); + } + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java similarity index 54% copy from standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java copy to standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java index 415988dea3..6e48607d33 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java @@ -15,50 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hive.metastore; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; -import org.junit.Assert; -import org.junit.Before; -import org.junit.experimental.categories.Category; - @Category(MetastoreCheckinTest.class) -public class TestRemoteHiveMetaStore extends TestHiveMetaStore { - private static boolean isServerStarted = false; - protected static int port; +public class TestRemoteHiveHttpMetaStore extends TestRemoteHiveMetaStore { - public TestRemoteHiveMetaStore() { - super(); - isThriftClient = true; - } - - @Before - public void setUp() throws Exception { - super.setUp(); - - if (isServerStarted) { - Assert.assertNotNull("Unable to connect to the MetaStore server", client); - return; - } + private static final Logger LOG = LoggerFactory.getLogger(TestRemoteHiveHttpMetaStore.class); - port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), - conf); - System.out.println("Starting MetaStore Server on port " + port); - isServerStarted = true; - - // This is default case with setugi off for both client and server - client = createClient(); + @Override + public void start() throws Exception { + MetastoreConf.setVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "http"); + LOG.info("Attempting to start test remote metastore in http mode"); + super.start(); + LOG.info("Successfully started test remote metastore in http mode"); } @Override protected HiveMetaStoreClient createClient() throws Exception { - MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port); - MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false); - return new HiveMetaStoreClient(conf); + MetastoreConf.setVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http"); + return super.createClient(); } -} \ No newline at end of file +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java index 415988dea3..9b07321529 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java @@ -45,7 +45,10 @@ public class TestRemoteHiveMetaStore extends TestHiveMetaStore { Assert.assertNotNull("Unable to connect to the MetaStore server", client); return; } + start(); + } + protected void start() throws Exception { port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf); System.out.println("Starting MetaStore Server on port " + port); diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml index 8b242263ce..48fbe91487 100644 --- a/standalone-metastore/pom.xml +++ b/standalone-metastore/pom.xml @@ -103,6 +103,7 @@ <spotbugs.version>4.0.3</spotbugs.version> <caffeine.version>2.8.4</caffeine.version> <slf4j.version>1.7.30</slf4j.version> + <httpcomponents.core.version>4.4.13</httpcomponents.core.version> <!-- Thrift properties --> <thrift.home>you-must-set-this-to-run-thrift</thrift.home> <thrift.gen.dir>${basedir}/src/gen/thrift</thrift.gen.dir> @@ -361,6 +362,11 @@ <scope>runtime</scope> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>${httpcomponents.core.version}</version> + </dependency> <!-- test scope dependencies --> <dependency> <groupId>junit</groupId>