This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 7871199a49c692deef70628bc12ef022f4099bd3 Author: Kurt Deschler <[email protected]> AuthorDate: Sat Sep 16 20:53:46 2023 -0500 HIVE-27872: Support multi-stream parallel fetch in JDBC driver (Kurt Deschler, reviewed by Attila Turoczy, Denys Kuzmenko) This patch enables JDBC to open multiple sockets to an HS2 service and performance concurrent fetch results for a single query. This can significantly speed up fetching of large results that are bottlenecked on Thrift serialization, de-serialization, and string conversion. With adequate threads, fetch performance will now only be limited by the single-threaded client-side result processing and server-size row materialization. Added JDBC Client parameter fetchThreads to control the number of threads allocated for fetching. Setting fetchThreads=1 will pipeline the Fetch using the existing connection asynchronously. Setting fetchThreads>1 will cause an additional Thrift connection to be opened to the server for each thread. Care should be taken not to over-allocate connections to the server. Added new HiveConf parameter hive.jdbc.fetch.threads to allow config of fetchThrads from server conf. Closes #4902 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../java/org/apache/hive/jdbc/HiveConnection.java | 62 ++++- .../org/apache/hive/jdbc/HiveQueryResultSet.java | 267 ++++++++++++++++++--- .../java/org/apache/hive/jdbc/HiveStatement.java | 16 +- jdbc/src/java/org/apache/hive/jdbc/Utils.java | 11 + 5 files changed, 318 insertions(+), 40 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cf564da18d4..9c04dc061b9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1960,6 +1960,8 @@ public class HiveConf extends Configuration { HIVE_ENABLE_JDBC_SAFE_PUSHDOWN("hive.jdbc.pushdown.safe.enable", false, "Flag to control enabling pushdown of operators using Calcite that prevent splitting results\n" + "retrieval in the JDBC storage handler"), + HIVE_JDBC_FETCH_THREADS("hive.jdbc.fetch.threads", 1, + "Controls the number of thread/connections used to fetch results for a JDBC query"), // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 015a9f95cd6..b90a9c987f0 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -19,6 +19,7 @@ package org.apache.hive.jdbc; import static org.apache.hadoop.hive.conf.Constants.MODE; +import static org.apache.hive.service.auth.HiveAuthConstants.AuthTypes; import static org.apache.hive.service.cli.operation.hplsql.HplSqlQueryExecutor.HPLSQL; import java.io.BufferedReader; @@ -106,6 +107,7 @@ import org.apache.hive.service.auth.HiveAuthConstants; import org.apache.hive.service.auth.KerberosSaslHelper; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.auth.SaslQOP; + import org.apache.hive.service.cli.session.SessionUtils; import org.apache.hive.service.rpc.thrift.TCLIService; import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq; @@ -177,6 +179,7 @@ public class HiveConnection implements java.sql.Connection { private int loginTimeout = 0; private TProtocolVersion protocol; int fetchSize; + int fetchThreads; private String initFile = null; private String wmPool = null, wmApp = null; private Properties clientInfo; @@ -184,6 +187,8 @@ public class HiveConnection implements java.sql.Connection { private int maxRetries = 1; private IJdbcBrowserClient browserClient; + public TCLIService.Iface getClient() { return client; } + /** * Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL * @param zookeeperBasedHS2Url @@ -279,12 +284,26 @@ public class HiveConnection implements java.sql.Connection { sessConfMap = null; isEmbeddedMode = true; fetchSize = 50; + fetchThreads = 0; } public HiveConnection(String uri, Properties info) throws SQLException { this(uri, info, HiveJdbcBrowserClientFactory.get()); } + /** + * Create a new connection that shares the same session ID as the current connection. + */ + public HiveConnection(HiveConnection hiveConnection) throws SQLException { + this(hiveConnection.getConnectedUrl(), hiveConnection.getClientInfo(), HiveJdbcBrowserClientFactory.get(), false); + // These are set/updated when the session is established. + this.sessHandle = hiveConnection.sessHandle; + this.connParams = hiveConnection.connParams; + this.protocol = hiveConnection.protocol; + this.fetchSize = hiveConnection.fetchSize; + this.fetchThreads = hiveConnection.fetchThreads; + } + @VisibleForTesting protected int getNumRetries() { return maxRetries; @@ -293,6 +312,12 @@ public class HiveConnection implements java.sql.Connection { @VisibleForTesting protected HiveConnection(String uri, Properties info, IJdbcBrowserClientFactory browserClientFactory) throws SQLException { + this(uri, info, browserClientFactory, true); + } + + protected HiveConnection(String uri, Properties info, + IJdbcBrowserClientFactory browserClientFactory, + boolean initSession) throws SQLException { try { connParams = Utils.parseURL(uri, info); } catch (ZooKeeperHiveClientException e) { @@ -311,7 +336,7 @@ public class HiveConnection implements java.sql.Connection { // Ensure UserGroupInformation includes any authorized Kerberos principals. LOG.debug("Configuring Kerberos mode"); Configuration config = new Configuration(); - config.set("hadoop.security.authentication", "Kerberos"); + config.set("hadoop.security.authentication", AuthTypes.KERBEROS.getAuthName()); UserGroupInformation.setConfiguration(config); if (isEnableCanonicalHostnameCheck()) { @@ -365,8 +390,10 @@ public class HiveConnection implements java.sql.Connection { throw new SQLException(new IllegalArgumentException( "Browser mode is not supported in embedded mode")); } - openSession(); - executeInitSql(); + if (initSession) { + openSession(); + executeInitSql(); + } } else { long retryInterval = 1000L; try { @@ -388,8 +415,10 @@ public class HiveConnection implements java.sql.Connection { // set up the client client = new TCLIService.Client(new TBinaryProtocol(transport)); // open client session - openSession(); - executeInitSql(); + if (initSession) { + openSession(); + executeInitSql(); + } break; } catch (Exception e) { @@ -1255,10 +1284,10 @@ public class HiveConnection implements java.sql.Connection { protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); - ConfVars confVars = ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE; - int serverFetchSize = Optional.ofNullable(openResp.getConfiguration().get(confVars.varname)) + ConfVars fetchSizeConf = ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE; + int serverFetchSize = Optional.ofNullable(openResp.getConfiguration().get(fetchSizeConf.varname)) .map(size -> Integer.parseInt(size)) - .orElse(confVars.defaultIntVal); + .orElse(fetchSizeConf.defaultIntVal); if (serverFetchSize <= 0) { throw new IllegalStateException("Default fetch size must be greater than 0"); } @@ -1266,6 +1295,19 @@ public class HiveConnection implements java.sql.Connection { .map(size -> Integer.parseInt(size)) .filter(v -> v > 0) .orElse(serverFetchSize); + + ConfVars fetchThreadsConf = ConfVars.HIVE_JDBC_FETCH_THREADS; + int serverFetchThreads = Optional.ofNullable(openResp.getConfiguration().get(fetchThreadsConf.varname)) + .map(size -> Integer.parseInt(size)) + .orElse(fetchThreadsConf.defaultIntVal); + if (serverFetchThreads <= 0) { + throw new IllegalStateException("Default fetch threads must be >= 0"); + } + this.fetchThreads = Optional.ofNullable(sessConfMap.get(JdbcConnectionParams.FETCH_THREADS)) + .map(size -> Integer.parseInt(size)) + .filter(v -> v >= 0) + .orElse(serverFetchThreads); + } /** @@ -1582,7 +1624,7 @@ public class HiveConnection implements java.sql.Connection { if (isClosed) { throw new SQLException("Can't create Statement, connection is closed"); } - return new HiveStatement(this, client, sessHandle, false, fetchSize); + return new HiveStatement(this, client, sessHandle, false, fetchSize, fetchThreads); } /* @@ -1605,7 +1647,7 @@ public class HiveConnection implements java.sql.Connection { if (isClosed) { throw new SQLException("Connection is closed"); } - return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, fetchSize); + return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE); } /* diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 375d1165248..874dfb55301 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -26,10 +26,19 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hive.service.cli.RowSet; @@ -65,10 +74,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet { public static final Logger LOG = LoggerFactory.getLogger(HiveQueryResultSet.class); + private Connection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle; private TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; - private boolean check_operation_status; + private boolean checkOperationStatus; private int maxRows; private int fetchSize; private long rowsFetched = 0; @@ -82,10 +92,29 @@ public class HiveQueryResultSet extends HiveBaseResultSet { private final TProtocolVersion protocol; + ExecutorService pool = null; + AtomicBoolean hasStartRow = new AtomicBoolean(false); + int fetchThreads = 1; + int threadsStarted = 0; + + private class FetchResult { + Exception ex; + RowSet fetchedRows; + boolean hasMoreRows; + long startRow; + int numRows; + } + + BlockingQueue<FetchResult> resultQueue; + AtomicLong nextStartRow = new AtomicLong(1L); + AtomicReference<InterruptedException> interruptException = new AtomicReference<>(); + AtomicBoolean gotLastBatch = new AtomicBoolean(false); + AtomicBoolean poolDone = new AtomicBoolean(false); + public static class Builder { - private final Connection connection; private final Statement statement; + private Connection connection = null; private TCLIService.Iface client = null; private TOperationHandle stmtHandle = null; @@ -100,6 +129,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { private List<String> colTypes; private List<JdbcColumnAttributes> colAttributes; private int fetchSize = 50; + private int fetchThreads = 1; private boolean emptyResultSet = false; private boolean isScrollable = false; @@ -113,6 +143,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet { this.connection = connection; } + public Builder setConnection(Connection connection) { + this.connection = connection; + return this; + } + public Builder setClient(TCLIService.Iface client) { this.client = client; return this; @@ -149,6 +184,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet { return this; } + public Builder setFetchThreads(int fetchThreads) { + this.fetchThreads = fetchThreads; + return this; + } + public Builder setEmptyResultSet(boolean emptyResultSet) { this.emptyResultSet = emptyResultSet; return this; @@ -171,8 +211,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet { protected HiveQueryResultSet(Builder builder) throws SQLException { this.statement = builder.statement; this.client = builder.client; + this.connection = builder.connection; this.stmtHandle = builder.stmtHandle; this.fetchSize = builder.fetchSize; + this.fetchThreads = builder.fetchThreads; columnNames = new ArrayList<String>(); normalizedColumnNames = new ArrayList<String>(); columnTypes = new ArrayList<String>(); @@ -184,10 +226,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet { } this.emptyResultSet = builder.emptyResultSet; this.maxRows = builder.maxRows; - check_operation_status = (statement instanceof HiveStatement); + checkOperationStatus = (statement instanceof HiveStatement); this.isScrollable = builder.isScrollable; this.protocol = builder.getProtocolVersion(); - InitEmptyIterator(); + initEmptyIterator(); + resultQueue = new ArrayBlockingQueue<>(Math.max(fetchThreads, 1)); } /** @@ -272,7 +315,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { colNames.forEach(i -> normalizedColumnNames.add(i.toLowerCase())); } - private void InitEmptyIterator() throws SQLException { + private void initEmptyIterator() throws SQLException { try { fetchedRows = RowSetFactory.create(new TRowSet(), protocol); fetchedRowsItr = fetchedRows.iterator(); @@ -283,6 +326,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { @Override public void close() throws SQLException { + shutdownPool(); if (this.statement != null && (this.statement instanceof HiveStatement)) { /* * HIVE-25203: Be aware that a ResultSet is not supposed to control its parent Statement's @@ -300,7 +344,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { client = null; stmtHandle = null; isClosed = true; - InitEmptyIterator(); + initEmptyIterator(); } private void closeOperationHandle(TOperationHandle stmtHandle) throws SQLException { @@ -317,6 +361,16 @@ public class HiveQueryResultSet extends HiveBaseResultSet { } } + private void closeConn(HiveConnection conn) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + LOG.debug("Error closing connection {}", e.toString()); + } + } + } + private boolean nextRowBatch() throws SQLException { if (isClosed) { throw new SQLException("Resultset is closed"); @@ -324,27 +378,148 @@ public class HiveQueryResultSet extends HiveBaseResultSet { if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || fetchDone) { return false; } - if (check_operation_status) { - TGetOperationStatusResp operationStatus = - ((HiveStatement) statement).waitForOperationToComplete(); - check_operation_status = false; - } - - try { - int fetchSizeBounded = fetchSize; - if (maxRows > 0 && rowsFetched + fetchSize > maxRows) { - fetchSizeBounded = maxRows - (int)rowsFetched; + if (checkOperationStatus) { + ((HiveStatement) statement).waitForOperationToComplete(); + checkOperationStatus = false; + } + +// TODO: Could support pool with maxRows by bounding results instead + if (rowsFetched < fetchSize || fetchThreads == 0 || maxRows > 0) { + try { + int fetchSizeBounded = fetchSize; + if (maxRows > 0 && rowsFetched + fetchSize > maxRows) { + fetchSizeBounded = maxRows - (int)rowsFetched; + } + TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, + orientation, fetchSizeBounded); + TFetchResultsResp fetchResp = client.FetchResults(fetchReq); + Utils.verifySuccessWithInfo(fetchResp.getStatus()); + TRowSet results = fetchResp.getResults(); + if (results.getStartRowOffset() > 0) { + hasStartRow.set(true); + } + fetchedRows = RowSetFactory.create(results, protocol); + fetchDone = !fetchResp.isHasMoreRows() && fetchedRows.numRows() == 0; + if (fetchDone) { + gotLastBatch.set(true); + } + fetchedRows = RowSetFactory.create(results, protocol); + nextStartRow.set(results.getStartRowOffset() + 1 + fetchedRows.numRows()); + } catch (TException ex) { + throw new SQLException("Error retrieving next row", ex); + } + } else { + if (!gotLastBatch.get()) { + if (pool == null) { + pool = Executors.newFixedThreadPool(fetchThreads); + } + // Add another thread on each row batch up to the limit + if (threadsStarted < (hasStartRow.get() ? fetchThreads : 1)) { + final boolean useMainClient = (threadsStarted == 0); + threadsStarted++; + pool.execute(()-> { + LOG.debug("Started thread {}", Thread.currentThread().getName()); + + TCLIService.Iface fetchClient = null; + HiveConnection threadConn = null; + long startTime = System.nanoTime(); + + while (!gotLastBatch.get() && !poolDone.get()) { + if (threadConn != connection) { + long endTime = System.nanoTime(); + // Re-open cloned connections every 5 sec to avoid starvation + if (endTime - startTime > 5000000000L) { + closeConn(threadConn); + threadConn = null; + startTime = endTime; + } + } + if (threadConn == null) { + if (useMainClient) { + threadConn = (HiveConnection)connection; + fetchClient = client; + } else { + try { + threadConn = new HiveConnection((HiveConnection)connection); + fetchClient = threadConn.getClient(); + } catch (SQLException e) { + LOG.debug("Multi-stream connection error {}", e.toString()); + return; + } + } + } + FetchResult result = new FetchResult(); + try { + final TFetchResultsReq fetchReq = new TFetchResultsReq( + stmtHandle, orientation, fetchSize); + TFetchResultsResp fetchResp = fetchClient.FetchResults(fetchReq); + Utils.verifySuccessWithInfo(fetchResp.getStatus()); + TRowSet results = fetchResp.getResults(); + if (results.getStartRowOffset() > 0) { + hasStartRow.set(true); + } + result.fetchedRows = RowSetFactory.create(results, protocol); + result.numRows = result.fetchedRows.numRows(); + boolean hasMoreRows = result.numRows > 0 || fetchResp.isHasMoreRows(); + if (!hasMoreRows) { + gotLastBatch.set(true); + } + result.hasMoreRows = hasMoreRows; + result.fetchedRows = RowSetFactory.create(results, protocol); + result.startRow = results.getStartRowOffset() + 1; + if (hasStartRow.get() && result.startRow < nextStartRow.get()) { + throw new SQLException("Unexpected row offset"); + } + } catch (Exception e) { + result.ex = e; + } + + try { + // Wait for earlier row sets to be added to the queue + synchronized(nextStartRow) { + if (!poolDone.get()) { + if (result.ex == null) { + if (hasStartRow.get()) { + while (nextStartRow.get() != result.startRow) { + nextStartRow.wait(); + } + nextStartRow.set(result.startRow + result.numRows); + } + poolDone.set(!result.hasMoreRows); + } else { + poolDone.set(true); + } + resultQueue.put(result); + if (hasStartRow.get()) { + nextStartRow.notifyAll(); + } + } + } + } catch (InterruptedException e) { + interruptException.set(e); + break; + } + } + if (threadConn != connection) { + closeConn(threadConn); + } + }); + } + } + try { + if (interruptException.get() != null) { + throw interruptException.get(); + } + FetchResult result = resultQueue.take(); + fetchDone = !result.hasMoreRows; + if (result.ex != null) { + shutdownPool(); + throw new SQLException(result.ex); + } + fetchedRows = result.fetchedRows; + } catch (InterruptedException e) { + throw new SQLException(e); } - TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, - orientation, fetchSizeBounded); - TFetchResultsResp fetchResp = client.FetchResults(fetchReq); - Utils.verifySuccessWithInfo(fetchResp.getStatus()); - fetchDone = !fetchResp.isHasMoreRows(); - - fetchedRows = RowSetFactory.create(fetchResp.getResults(), protocol); - } catch (TException ex) { - ex.printStackTrace(); - throw new SQLException("Error retrieving next row", ex); } orientation = TFetchOrientation.FETCH_NEXT; @@ -353,6 +528,34 @@ public class HiveQueryResultSet extends HiveBaseResultSet { return fetchedRowsItr.hasNext(); } + void drainQueue() throws SQLException { + FetchResult result; + while ((result = resultQueue.poll()) != null) { + if (result.ex != null) { + throw new SQLException(result.ex); + } + } + } + + void shutdownPool() throws SQLException { + if (pool != null) { + poolDone.set(true); + drainQueue(); + pool.shutdownNow(); + try { + while (!pool.awaitTermination(1, TimeUnit.SECONDS)) { + drainQueue(); + LOG.debug("Slow fetch thread shutdown"); + } + } catch (InterruptedException e) { + throw new SQLException(e); + } + drainQueue(); + pool = null; + threadsStarted = 0; + } + } + /** * Moves the cursor down one row from its current position. * @@ -361,8 +564,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet { * if a database access error occurs. */ public boolean next() throws SQLException { - if (!fetchedRowsItr.hasNext() && !nextRowBatch()) { - return false; + while (!fetchedRowsItr.hasNext()) { + if (!nextRowBatch()) { + return false; + } } row = fetchedRowsItr.next(); rowsFetched++; @@ -431,10 +636,14 @@ public class HiveQueryResultSet extends HiveBaseResultSet { } // If we are asked to start from begining, clear the current fetched resultset - InitEmptyIterator(); + shutdownPool(); + initEmptyIterator(); orientation = TFetchOrientation.FETCH_FIRST; rowsFetched = 0; + nextStartRow.set(1L); fetchDone = false; + poolDone.set(false); + gotLastBatch.set(false); } @Override diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 506cfb30449..aba982670ac 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -76,6 +76,7 @@ public class HiveStatement implements java.sql.Statement { private final TSessionHandle sessHandle; Map<String, String> sessConf = new HashMap<>(); private int fetchSize; + private int fetchThreads; private final boolean isScrollableResultset; private boolean isOperationComplete = false; private boolean closeOnResultSetCompletion = false; @@ -126,11 +127,21 @@ public class HiveStatement implements java.sql.Statement { public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { - this(connection, client, sessHandle, false, connection.fetchSize); + this(connection, client, sessHandle, false, connection.fetchSize, connection.fetchThreads); + } + + public HiveStatement(HiveConnection connection, TCLIService.Iface client, + TSessionHandle sessHandle, boolean isScrollableResultset) { + this(connection, client, sessHandle, isScrollableResultset, connection.fetchSize, connection.fetchThreads); } public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle, boolean isScrollableResultset, int fetchSize) { + this(connection, client, sessHandle, isScrollableResultset, fetchSize, connection.fetchThreads); + } + + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle, + boolean isScrollableResultset, int fetchSize, int fetchThreads) { this.connection = Objects.requireNonNull(connection); this.client = Objects.requireNonNull(client); this.sessHandle = Objects.requireNonNull(sessHandle); @@ -142,6 +153,7 @@ public class HiveStatement implements java.sql.Statement { this.isScrollableResultset = isScrollableResultset; this.inPlaceUpdateStream = Optional.empty(); this.stmtHandle = Optional.empty(); + this.fetchThreads = fetchThreads; } @Override @@ -293,6 +305,7 @@ public class HiveStatement implements java.sql.Statement { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client) .setStmtHandle(stmtHandle.get()).setMaxRows(maxRows).setFetchSize(fetchSize) + .setConnection(connection).setFetchThreads(fetchThreads) .setScrollable(isScrollableResultset) .build(); return true; @@ -321,6 +334,7 @@ public class HiveStatement implements java.sql.Statement { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client) + .setConnection(connection).setFetchThreads(fetchThreads) .setStmtHandle(stmtHandle.get()).setMaxRows(maxRows) .setFetchSize(fetchSize).setScrollable(isScrollableResultset) .build(); diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 87007e9e4ba..a4230a4699f 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -166,6 +166,7 @@ public class Utils { // Set the fetchSize static final String FETCH_SIZE = "fetchSize"; static final String INIT_FILE = "initFile"; + static final String FETCH_THREADS = "fetchThreads"; static final String WM_POOL = "wmPool"; // Cookie prefix static final String HTTP_COOKIE_PREFIX = "http.cookie."; @@ -220,6 +221,16 @@ public class Utils { public static final String HIVE_DEFAULT_NULLS_LAST_KEY = HIVE_CONF_PREFIX + HiveConf.ConfVars.HIVE_DEFAULT_NULLS_LAST.varname; + private static String getFetchThreadsVarname() { + try { + return HiveConf.ConfVars.HIVE_JDBC_FETCH_THREADS.varname; + } catch(java.lang.NoSuchFieldError e) { + return "hive.jdbc.fetch.threads"; + } + } + public static final String HIVE_HIVE_JDBC_FETCH_THREADS_KEY = + HIVE_CONF_PREFIX + getFetchThreadsVarname(); + public JdbcConnectionParams() { }
