Repository: hive Updated Branches: refs/heads/master 22af0eff0 -> de2a5d63a
HIVE-14901: HiveServer2: Use user supplied fetch size to determine #rows serialized in tasks (Norris Lee reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de2a5d63 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de2a5d63 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de2a5d63 Branch: refs/heads/master Commit: de2a5d63af78ffb6ef97062b06b9f3eb9557e361 Parents: 22af0ef Author: Vaibhav Gumashta <vgumas...@hortonworks.com> Authored: Mon Mar 6 14:26:24 2017 -0800 Committer: Vaibhav Gumashta <vgumas...@hortonworks.com> Committed: Mon Mar 6 14:26:54 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 +- .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 30 ++++++++++++++++++++ .../org/apache/hive/jdbc/HiveConnection.java | 10 +++++++ .../serde2/thrift/ThriftJDBCBinarySerDe.java | 4 ++- .../org/apache/hive/service/cli/CLIService.java | 7 ++++- .../hive/service/cli/CLIServiceClient.java | 4 +-- .../hive/service/cli/session/HiveSession.java | 10 ++++++- .../service/cli/session/HiveSessionImpl.java | 24 ++++++++++++++++ .../service/cli/thrift/ThriftCLIService.java | 19 +++++++++++-- 9 files changed, 103 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a8aaa5c..676c527 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2619,7 +2619,7 @@ public class HiveConf extends Configuration { // TODO: Make use of this config to configure fetch size HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 10000, "Max number of rows sent in one Fetch RPC call by the server to the client."), - HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.resultset.default.fetch.size", 10000, + HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.thrift.resultset.default.fetch.size", 1000, "The number of rows sent in one Fetch RPC call by the server to the client, if not\n" + "specified by the client."), HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false, @@ -4237,6 +4237,7 @@ public class HiveConf extends Configuration { "hive\\.parquet\\..*", "hive\\.ppd\\..*", "hive\\.prewarm\\..*", + "hive\\.server2\\.thrift\\.resultset\\.default\\.fetch\\.size", "hive\\.server2\\.proxy\\.user", "hive\\.skewjoin\\..*", "hive\\.smbjoin\\..*", http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 3d4057b..afe23f8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -1321,4 +1321,34 @@ public class TestJdbcWithMiniHS2 { fs.delete(testPath, true); } } + + @Test + public void testFetchSize() throws Exception { + // Test setting fetch size below max + Connection fsConn = getConnection(miniHS2.getJdbcURL("default", "fetchSize=50", ""), + System.getProperty("user.name"), "bar"); + Statement stmt = fsConn.createStatement(); + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); + int fetchSize = stmt.getFetchSize(); + assertEquals(50, fetchSize); + stmt.close(); + fsConn.close(); + // Test setting fetch size above max + fsConn = getConnection( + miniHS2.getJdbcURL( + "default", + "fetchSize=" + (miniHS2.getHiveConf().getIntVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE) + 1), + ""), + System.getProperty("user.name"), "bar"); + stmt = fsConn.createStatement(); + stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); + fetchSize = stmt.getFetchSize(); + assertEquals( + miniHS2.getHiveConf().getIntVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE), + fetchSize); + stmt.close(); + fsConn.close(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 535ad3d..ed899c6 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -639,6 +639,9 @@ public class HiveConnection implements java.sql.Connection { } // switch the database openConf.put("use:database", connParams.getDbName()); + // set the fetchSize + openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", + Integer.toString(fetchSize)); // set the session configuration Map<String, String> sessVars = connParams.getSessionVars(); @@ -664,6 +667,13 @@ public class HiveConnection implements java.sql.Connection { } protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); + + // Update fetchSize if modified by server + String serverFetchSize = + openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size"); + if (serverFetchSize != null) { + fetchSize = Integer.parseInt(serverFetchSize); + } } catch (TException e) { LOG.error("Error opening session", e); throw new SQLException("Could not establish connection to " http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java index 84ed6ba..16cc916 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -71,7 +71,9 @@ public class ThriftJDBCBinarySerDe extends AbstractSerDe { @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { // Get column names - MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + MAX_BUFFERED_ROWS = + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); + LOG.info("ThriftJDBCBinarySerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/CLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index a009e25..5bb4ee0 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -81,7 +81,7 @@ public class CLIService extends CompositeService implements ICLIService { public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; sessionManager = new SessionManager(hiveServer2); - defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE); + defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); addService(sessionManager); // If the hadoop cluster is secure, do a kerberos login for the service from the keytab if (UserGroupInformation.isSecurityEnabled()) { @@ -476,6 +476,11 @@ public class CLIService extends CompositeService implements ICLIService { return opStatus; } + public HiveConf getSessionConf(SessionHandle sessionHandle) + throws HiveSQLException { + return sessionManager.getSession(sessionHandle).getSessionConf(); + } + private static final long PROGRESS_MAX_WAIT_NS = 30 * 1000000000l; private JobProgressUpdate progressUpdateLog(boolean isProgressLogRequested, Operation operation) { if (!isProgressLogRequested || !ServiceUtils.canProvideProgressLog(hiveConf) http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java index 4b84872..c965abc 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -31,10 +31,10 @@ import org.apache.hive.service.auth.HiveAuthFactory; * */ public abstract class CLIServiceClient implements ICLIService { - protected int defaultFetchRows = ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; + protected int defaultFetchRows = ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; public CLIServiceClient(Configuration conf) { - defaultFetchRows = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE); + defaultFetchRows = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); } public SessionHandle openSession(String username, String password) http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/session/HiveSession.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index e5d865b..bd4d90d 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hive.service.auth.HiveAuthFactory; @@ -183,7 +184,14 @@ public interface HiveSession extends HiveSessionBase { String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws HiveSQLException; - + + /** + * + * @return + * @throws HiveSQLException + */ + HiveConf getSessionConf() throws HiveSQLException; + /** * close the session * @throws HiveSQLException http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index f939a93..fd74d55 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -177,6 +177,9 @@ public class HiveSessionImpl implements HiveSession { } // Process global init file: .hiverc processGlobalInitFile(); + // Set fetch size in session conf map + sessionConfMap = setFetchSize(sessionConfMap); + if (sessionConfMap != null) { configureSession(sessionConfMap); } @@ -245,6 +248,22 @@ public class HiveSessionImpl implements HiveSession { } } + private Map<String, String> setFetchSize(Map<String, String> sessionConfMap) { + int maxFetchSize = + sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + String confFetchSize = sessionConfMap != null ? + sessionConfMap.get( + "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname) : + null; + if (confFetchSize != null && !confFetchSize.isEmpty()) { + int fetchSize = Integer.parseInt(confFetchSize); + sessionConfMap.put( + "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname, + Integer.toString(fetchSize > maxFetchSize ? maxFetchSize : fetchSize)); + } + return sessionConfMap; + } + private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException { SessionState.setCurrentSessionState(sessionState); for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) { @@ -465,6 +484,11 @@ public class HiveSessionImpl implements HiveSession { } @Override + public HiveConf getSessionConf() throws HiveSQLException { + return this.sessionConf; + } + + @Override public OperationHandle executeStatement(String statement, Map<String, String> confOverlay) throws HiveSQLException { return executeStatementInternal(statement, confOverlay, false, 0); } http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 211b33b..0fdc8d9 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -317,10 +317,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + Map<String, String> openConf = req.getConfiguration(); + SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); - // TODO: set real configuration map - resp.setConfiguration(new HashMap<String, String>()); + Map<String, String> configurationMap = new HashMap<String, String>(); + // Set the updated fetch size from the server into the configuration map for the client + HiveConf sessionConf = cliService.getSessionConf(sessionHandle); + configurationMap.put( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname, + Integer.toString(sessionConf != null ? + sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE) : + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE))); + resp.setConfiguration(configurationMap); resp.setStatus(OK_STATUS); ThriftCLIServerContext context = (ThriftCLIServerContext)currentServerContext.get(); @@ -733,6 +742,12 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { + // Set fetch size + int maxFetchSize = + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + if (req.getMaxRows() > maxFetchSize) { + req.setMaxRows(maxFetchSize); + } RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()),