This is an automated email from the ASF dual-hosted git repository. dengzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 088f4d71a8c HIVE-26633: Make Thrift MaxMessageSize configurable (John Sherman, reviewed by Aman Sinha, Zhihua Deng) 088f4d71a8c is described below commit 088f4d71a8c45b6f44d9f4be470f6eb7807a9ed8 Author: John Sherman <j...@cloudera.com> AuthorDate: Wed Oct 19 19:56:31 2022 -0700 HIVE-26633: Make Thrift MaxMessageSize configurable (John Sherman, reviewed by Aman Sinha, Zhihua Deng) Closes #3674 --- .../hadoop/hive/common/auth/HiveAuthUtils.java | 101 +++++++++++++++++---- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../minikdc/TestRemoteHiveMetaStoreKerberos.java | 50 ++++++++++ .../apache/hive/minikdc/TestSSLWithMiniKdc.java | 32 +++++++ .../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java | 7 +- .../java/org/apache/hive/jdbc/HiveConnection.java | 29 +++++- jdbc/src/java/org/apache/hive/jdbc/Utils.java | 1 + .../cli/thrift/RetryingThriftCLIServiceClient.java | 4 +- .../hadoop/hive/metastore/HiveMetaStoreClient.java | 19 +++- .../hadoop/hive/metastore/conf/MetastoreConf.java | 4 + .../hive/metastore/security/TFilterTransport.java | 24 ++--- .../hadoop/hive/metastore/TestHiveMetaStore.java | 9 +- 12 files changed, 237 insertions(+), 48 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java b/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java index 16b6bf77b91..a37d527c1ea 100644 --- a/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java @@ -33,9 +33,8 @@ import javax.net.ssl.TrustManagerFactory; import com.google.common.base.Splitter; import com.google.common.collect.Sets; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; @@ -50,45 +49,107 @@ import org.slf4j.LoggerFactory; public class HiveAuthUtils { private static final Logger LOG = LoggerFactory.getLogger(HiveAuthUtils.class); + /** + * Configure the provided T transport's max message size. + * @param transport Transport to configure maxMessage for + * @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use the Thrift library + * default. + * @return The passed in T transport configured with desired max message size. The same object passed in is returned. + */ + public static <T extends TTransport> T configureThriftMaxMessageSize(T transport, int maxMessageSize) { + if (maxMessageSize > 0) { + if (transport.getConfiguration() == null) { + LOG.warn("TTransport {} is returning a null Configuration, Thrift max message size is not getting configured", + transport.getClass().getName()); + return transport; + } + transport.getConfiguration().setMaxMessageSize(maxMessageSize); + } + return transport; + } + + /** + * Create a TSocket for the provided host and port with specified loginTimeout. Thrift maxMessageSize + * will default to Thrift library default. + * @param host Host to connect to. + * @param port Port to connect to. + * @param loginTimeout Socket timeout (0 means no timeout). + * @return TTransport TSocket for host/port. + */ public static TTransport getSocketTransport(String host, int port, int loginTimeout) throws TTransportException { - return new TSocket(new TConfiguration(),host, port, loginTimeout); + return getSocketTransport(host, port, loginTimeout, /* maxMessageSize */ -1); + } + + /** + * Create a TSocket for the provided host and port with specified loginTimeout and maxMessageSize. + * will default to Thrift library default. + * @param host Host to connect to. + * @param port Port to connect to. + * @param loginTimeout Socket timeout (0 means no timeout). + * @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal to 0 + * results in using the Thrift library default. + * @return TTransport TSocket for host/port + */ + public static TTransport getSocketTransport(String host, int port, int loginTimeout, int maxMessageSize) + throws TTransportException { + TSocket tSocket = new TSocket(host, port, loginTimeout); + return configureThriftMaxMessageSize(tSocket, maxMessageSize); + } + + public static TTransport getSSLSocket(String host, int port, int loginTimeout, TSSLTransportParameters params, + int maxMessageSize) throws TTransportException { + // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT and + // SSLContext created with the given params + TSocket tSSLSocket = null; + if (params != null) { + tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params); + } else { + tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout); + } + return getSSLSocketWithHttps(tSSLSocket, maxMessageSize); + } + + public static TTransport getSSLSocket(String host, int port, int loginTimeout, String trustStorePath, + String trustStorePassWord, String trustStoreType, String trustStoreAlgorithm) throws TTransportException { + return getSSLSocket(host, port, loginTimeout, trustStorePath, trustStorePassWord, trustStoreType, + trustStoreAlgorithm, /* maxMessageSize */ -1); } - public static TTransport getSSLSocket(String host, int port, int loginTimeout) - throws TTransportException { - // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT - TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout); - return getSSLSocketWithHttps(tSSLSocket); + public static TTransport getSSLSocket(String host, int port, int loginTimeout) throws TTransportException { + return getSSLSocket(host, port, loginTimeout, /* maxMessageSize */ -1); } - public static TTransport getSSLSocket(String host, int port, int loginTimeout, - String trustStorePath, String trustStorePassWord, String trustStoreType, - String trustStoreAlgorithm) throws TTransportException { - TSSLTransportFactory.TSSLTransportParameters params = - new TSSLTransportFactory.TSSLTransportParameters(); + public static TTransport getSSLSocket(String host, int port, int loginTimeout, int maxMessageSize) + throws TTransportException { + return getSSLSocket(host, port, loginTimeout, /* params */ null, maxMessageSize); + } + + public static TTransport getSSLSocket(String host, int port, int loginTimeout, String trustStorePath, + String trustStorePassWord, String trustStoreType, String trustStoreAlgorithm, int maxMessageSize) + throws TTransportException { + TSSLTransportParameters params = new TSSLTransportParameters(); String tStoreType = trustStoreType.isEmpty()? KeyStore.getDefaultType() : trustStoreType; String tStoreAlgorithm = trustStoreAlgorithm.isEmpty()? TrustManagerFactory.getDefaultAlgorithm() : trustStoreAlgorithm; params.setTrustStore(trustStorePath, trustStorePassWord, tStoreAlgorithm, tStoreType); params.requireClientAuth(true); - // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT and - // SSLContext created with the given params - TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params); - return getSSLSocketWithHttps(tSSLSocket); + return getSSLSocket(host, port, loginTimeout, params, maxMessageSize); } // Using endpoint identification algorithm as HTTPS enables us to do // CNAMEs/subjectAltName verification - private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException { + private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize) + throws TTransportException { SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket(); SSLParameters sslParams = sslSocket.getSSLParameters(); sslParams.setEndpointIdentificationAlgorithm("HTTPS"); sslSocket.setSSLParameters(sslParams); - return new TSocket(sslSocket); + TSocket tSocket = new TSocket(sslSocket); + return configureThriftMaxMessageSize(tSocket, maxMessageSize); } public static TServerSocket getServerSocket(String hiveHost, int portNum) - throws TTransportException { + throws TTransportException { InetSocketAddress serverAddress; if (hiveHost == null || hiveHost.isEmpty()) { // Wildcard bind diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f0b1a7c1bfd..1e06e0a956b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2906,7 +2906,10 @@ public class HiveConf extends Configuration { HIVE_STATS_MAX_NUM_STATS("hive.stats.max.num.stats", (long) 10000, "When the number of stats to be updated is huge, this value is used to control the number of \n" + " stats to be sent to HMS for update."), - + HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE("hive.thrift.client.max.message.size", "1gb", + new SizeValidator(-1L, true, (long) Integer.MAX_VALUE, true), + "Thrift client configuration for max message size. 0 or -1 will use the default defined in the Thrift " + + "library. The upper limit is 2147483648 bytes (or 2gb)."), // Concurrency HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, "Whether Hive supports concurrency control or not. \n" + diff --git a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java index 50be9d76984..c24cb5b62bb 100644 --- a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java +++ b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java @@ -18,11 +18,24 @@ package org.apache.hive.minikdc; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TestRemoteHiveMetaStore; +import org.apache.hadoop.hive.metastore.TestHiveMetaStore; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.thrift.transport.TTransportException; import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; public class TestRemoteHiveMetaStoreKerberos extends TestRemoteHiveMetaStore { private static MiniHiveKdc miniKDC; @@ -45,6 +58,43 @@ public class TestRemoteHiveMetaStoreKerberos extends TestRemoteHiveMetaStore { super.setUp(); } + @Test + public void testThriftMaxMessageSize() throws Throwable { + String dbName = "compdb"; + String tblName = "comptbl"; + String typeName = "Person"; + + cleanUp(dbName, tblName, typeName); + List<List<String>> values = new ArrayList<>(); + values.add(makeVals("2008-07-01 14:13:12", "14")); + values.add(makeVals("2008-07-01 14:13:12", "15")); + values.add(makeVals("2008-07-02 14:13:12", "15")); + values.add(makeVals("2008-07-03 14:13:12", "151")); + + createMultiPartitionTableSchema(dbName, tblName, typeName, values); + + Configuration clientConf = MetastoreConf.newMetastoreConf(new Configuration(conf)); + MetastoreConf.setVar(clientConf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port); + // set to a low value to prove THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE is being honored + // (it should throw an exception) + MetastoreConf.setVar(clientConf, ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1024"); + HiveMetaStoreClient limitedClient = new HiveMetaStoreClient(clientConf); + Exception expectedException = assertThrows(TTransportException.class, () -> { + limitedClient.listPartitions(dbName, tblName, (short)-1); + }); + String exceptionMessage = expectedException.getMessage(); + // Verify the Thrift library is enforcing the limit + assertTrue(exceptionMessage.contains("MaxMessageSize reached")); + limitedClient.close(); + + // test default client (with a default THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE) + List<Partition> partitions = client.listPartitions(dbName, tblName, (short) -1); + assertNotNull(partitions); + assertEquals("expected to receive the same number of partitions added", values.size(), partitions.size()); + + cleanUp(dbName, tblName, typeName); + } + @Override protected HiveMetaStoreClient createClient() throws Exception { MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port); diff --git a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java index cea9503697a..7ca74efb648 100644 --- a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java +++ b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java @@ -24,16 +24,24 @@ import java.sql.Statement; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.ColumnType; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.thrift.transport.TTransportException; import org.hadoop.hive.jdbc.SSLTestUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; public class TestSSLWithMiniKdc { @@ -84,6 +92,30 @@ public class TestSSLWithMiniKdc { stmt.close(); } + @Test + public void testHmsThriftMaxMessageSize() throws Exception { + Configuration clientConf = MetastoreConf.newMetastoreConf(new Configuration(miniHS2.getHiveConf())); + MetastoreConf.setVar(clientConf, MetastoreConf.ConfVars.THRIFT_URIS, "thrift://localhost:" + miniHS2.getHmsPort()); + // set to a low value to prove THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE is being honored + // (it should throw an exception) + MetastoreConf.setVar(clientConf, MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "512"); + HiveMetaStoreClient limitedClient = new HiveMetaStoreClient(clientConf); + String dbName = "default"; + String tableName = "testThriftMaxMessageSize"; + TableBuilder tblBuilder = new TableBuilder().setDbName(dbName).setTableName(tableName); + for (int i = 0; i <= 10; i++) { + tblBuilder.addCol("abcdefghijklmnopqrstuvwxyz" + i, ColumnType.STRING_TYPE_NAME); + } + tblBuilder.create(limitedClient, clientConf); + Exception expectedException = assertThrows(TTransportException.class, () -> { + limitedClient.getTable(dbName, tableName); + }); + String exceptionMessage = expectedException.getMessage(); + // Verify the Thrift library is enforcing the limit + assertTrue(exceptionMessage.contains("MaxMessageSize reached")); + limitedClient.close(); + } + private Connection getConnection(String userName) throws Exception { miniHiveKdc.loginUser(userName); return DriverManager.getConnection(miniHS2.getJdbcURL("default", SSLTestUtils.SSL_CONN_PARAMS), diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index e85a6bcd9f7..9e95d3b2db9 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -87,6 +87,7 @@ public class MiniHS2 extends AbstractHiveService { private boolean usePortsFromConf = false; private PamAuthenticator pamAuthenticator; private boolean createTransactionalTables; + private int hmsPort = 0; public enum MiniClusterType { MR, @@ -372,7 +373,7 @@ public class MiniHS2 extends AbstractHiveService { public void start(Map<String, String> confOverlay) throws Exception { if (isMetastoreRemote) { - MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), getHiveConf(), + hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), getHiveConf(), false, false, false, false, createTransactionalTables); setWareHouseDir(MetastoreConf.getVar(getHiveConf(), MetastoreConf.ConfVars.WAREHOUSE)); } @@ -728,4 +729,8 @@ public class MiniHS2 extends AbstractHiveService { // Ignore. Safe if it does not exist. } } + + public int getHmsPort() { + return hmsPort; + } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index abc543843a5..4a6fb7c423d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -534,6 +534,7 @@ public class HiveConnection implements java.sql.Connection { validateSslForBrowserMode(); httpClient = getHttpClient(useSsl); transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); + HiveAuthUtils.configureThriftMaxMessageSize(transport, getMaxMessageSize()); return transport; } @@ -849,8 +850,10 @@ public class HiveConnection implements java.sql.Connection { * * @return TTransport * @throws TTransportException + * @throws SQLException */ - private TTransport createUnderlyingTransport() throws TTransportException { + private TTransport createUnderlyingTransport() throws TTransportException, SQLException { + int maxMessageSize = getMaxMessageSize(); TTransport transport = null; // Note: Thrift returns an SSL socket that is already bound to the specified host:port // Therefore an open called on this would be a no-op later @@ -864,7 +867,7 @@ public class HiveConnection implements java.sql.Connection { JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); if (sslTrustStore == null || sslTrustStore.isEmpty()) { - transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout); + transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout, maxMessageSize); } else { String trustStoreType = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_TYPE); @@ -876,16 +879,32 @@ public class HiveConnection implements java.sql.Connection { if (trustStoreAlgorithm == null) { trustStoreAlgorithm = ""; } - transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout, - sslTrustStore, sslTrustStorePassword, trustStoreType, trustStoreAlgorithm); + transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout, sslTrustStore, sslTrustStorePassword, + trustStoreType, trustStoreAlgorithm, maxMessageSize); } } else { // get non-SSL socket transport - transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout); + transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout, maxMessageSize); } return transport; } + private int getMaxMessageSize() throws SQLException { + String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE); + if (maxMessageSize == null) { + return -1; + } + + try { + return Integer.parseInt(maxMessageSize); + } catch (Exception e) { + String errFormat = "Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. " + + "A configuration of <= 0 uses default max message size."; + String errMsg = String.format(errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize); + throw new SQLException(errMsg, "42000", e); + } + } + /** * Create transport per the connection options * Supported transport options are: diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 764ae11b08b..c1be6a52df4 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -165,6 +165,7 @@ public class Utils { // Create external purge table by default static final String CREATE_TABLE_AS_EXTERNAL = "hiveCreateAsExternalLegacy"; public static final String SOCKET_TIMEOUT = "socketTimeout"; + static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size"; // We support ways to specify application name modeled after some existing DBs, since // there's no standard approach. diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java index 53641f31c84..9079c652d0a 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import javax.security.sasl.SaslException; +import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.PlainSaslHelper; @@ -309,9 +310,10 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler { String host = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); int port = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT); + int maxThriftMessageSize = (int) conf.getSizeVar(HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE); LOG.info("Connecting to " + host + ":" + port); - transport = new TSocket(host, port); + transport = HiveAuthUtils.getSocketTransport(host, port, 0, maxThriftMessageSize); ((TSocket) transport).setTimeout((int) conf.getTimeVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, TimeUnit.SECONDS) * 1000); try { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 2ed37e7c111..53c7c6ba7a2 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -608,6 +608,19 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { client.rename_partition_req(req); } + private <T extends TTransport> T configureThriftMaxMessageSize(T transport) { + int maxThriftMessageSize = (int) MetastoreConf.getSizeVar(conf, ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE); + if (maxThriftMessageSize > 0) { + if (transport.getConfiguration() == null) { + LOG.warn("TTransport {} is returning a null Configuration, Thrift max message size is not getting configured", + transport.getClass().getName()); + return transport; + } + transport.getConfiguration().setMaxMessageSize(maxThriftMessageSize); + } + return transport; + } + /* Creates a THttpClient if HTTP mode is enabled. If Client auth mode is set to JWT, then the method fetches JWT from environment variable: HMS_JWT and sets in auth @@ -681,7 +694,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } } LOG.debug("Created thrift http client for URL: " + httpUrl); - return tHttpClient; + return configureThriftMaxMessageSize(tHttpClient); } private TTransport createBinaryClient(URI store, boolean useSSL) throws TTransportException, @@ -705,7 +718,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { binaryTransport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout, trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm); } else { - binaryTransport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), + binaryTransport = new TSocket(new TConfiguration(), store.getHost(), store.getPort(), clientSocketTimeout); } binaryTransport = createAuthBinaryTransport(store, binaryTransport); @@ -718,7 +731,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } } LOG.debug("Created thrift binary client for URI: " + store); - return binaryTransport; + return configureThriftMaxMessageSize(binaryTransport); } private void open() throws MetaException { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index d3c08a540cf..460e41b418e 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1399,6 +1399,10 @@ public class MetastoreConf { "If dynamic service discovery mode is set, the URIs are used to connect to the" + " corresponding service discovery servers e.g. a zookeeper. Otherwise they are " + "used as URIs for remote metastore."), + THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE("metastore.thrift.client.max.message.size", + "hive.thrift.client.max.message.size", "1gb", new SizeValidator(-1L, true, (long) Integer.MAX_VALUE, true), + "Thrift client configuration for max message size. 0 or -1 will use the default defined in the Thrift " + + "library. The upper limit is 2147483648 bytes (or 2gb)."), THRIFT_SERVICE_DISCOVERY_MODE("metastore.service.discovery.mode", "hive.metastore.service.discovery.mode", "", diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java index 5ff672c5444..e9670e38ded 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java @@ -98,18 +98,18 @@ import org.apache.thrift.transport.TTransportException; wrapped.consumeBuffer(len); } - @Override - public TConfiguration getConfiguration() { - return null; - } - - @Override - public void updateKnownMessageSize(long l) throws TTransportException { - - } + @Override + public TConfiguration getConfiguration() { + return wrapped.getConfiguration(); + } - @Override - public void checkReadBytesAvailable(long l) throws TTransportException { + @Override + public void updateKnownMessageSize(long l) throws TTransportException { + wrapped.updateKnownMessageSize(l); + } - } + @Override + public void checkReadBytesAvailable(long l) throws TTransportException { + wrapped.checkReadBytesAvailable(l); + } } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 67fc7063662..5217e8d8506 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -495,7 +495,7 @@ public abstract class TestHiveMetaStore { assertTrue("Not all parts returned", mpartial.containsAll(expectedPartitions)); } - private static List<String> makeVals(String ds, String id) { + public static List<String> makeVals(String ds, String id) { List <String> vals4 = new ArrayList<>(2); vals4.add(ds); vals4.add(id); @@ -552,7 +552,6 @@ public abstract class TestHiveMetaStore { " partitions",values.size(), partitions.size()); cleanUp(dbName, tblName, typeName); - } @Test @@ -3005,7 +3004,7 @@ public abstract class TestHiveMetaStore { stmt.executeUpdate(); } - private void cleanUp(String dbName, String tableName, String typeName) throws Exception { + protected void cleanUp(String dbName, String tableName, String typeName) throws Exception { if(dbName != null && tableName != null) { client.dropTable(dbName, tableName); } @@ -3017,7 +3016,7 @@ public abstract class TestHiveMetaStore { } } - private Database createDb(String dbName) throws Exception { + protected Database createDb(String dbName) throws Exception { if(null == dbName) { return null; } return new DatabaseBuilder() .setName(dbName) @@ -3101,7 +3100,7 @@ public abstract class TestHiveMetaStore { return partitions; } - private List<Partition> createMultiPartitionTableSchema(String dbName, String tblName, + protected List<Partition> createMultiPartitionTableSchema(String dbName, String tblName, String typeName, List<List<String>> values) throws Throwable { return createMultiPartitionTableSchema(null, dbName, tblName, typeName, values); }