This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 6166da46337a75d0131c591e4b3aa339961514d2 Author: Kurt Deschler <[email protected]> AuthorDate: Thu Nov 16 13:11:06 2023 -0500 HIVE-27873: Fix getOperationStatus and optimize fetch (Kurt Deschler, reviewed by Attila Turoczy, Denys Kuzmenko) This patch fixes a major performance issue fetching result from Impala. The problem was that Impala does not set isHasResultSet during getOperationStatus() calls, resulting in that RPC getting called and logging a completion message for every row fetched. Optimizes the fetch path to minimize conditional checks in the fast path. Closes #4902 --- .../org/apache/hive/jdbc/HiveQueryResultSet.java | 120 +++++++++++---------- 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 7acb852cb95..375d1165248 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -53,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TRowSet; import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.hive.service.rpc.thrift.TTypeQualifierValue; import org.apache.hive.service.rpc.thrift.TTypeQualifiers; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,17 +67,18 @@ public class HiveQueryResultSet extends HiveBaseResultSet { private TCLIService.Iface client; private TOperationHandle stmtHandle; + private TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; + private boolean check_operation_status; private int maxRows; private int fetchSize; - private int rowsFetched = 0; + private long rowsFetched = 0; + private boolean fetchDone = false; private RowSet fetchedRows; private Iterator<Object[]> fetchedRowsItr; private boolean isClosed = false; private boolean emptyResultSet = false; private boolean isScrollable = false; - private boolean fetchFirst = false; - private TGetOperationStatusResp operationStatus = null; private final TProtocolVersion protocol; @@ -182,11 +184,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet { } this.emptyResultSet = builder.emptyResultSet; this.maxRows = builder.maxRows; - if (builder.emptyResultSet) { - this.maxRows = 0; - } + check_operation_status = (statement instanceof HiveStatement); this.isScrollable = builder.isScrollable; this.protocol = builder.getProtocolVersion(); + InitEmptyIterator(); } /** @@ -271,6 +272,15 @@ public class HiveQueryResultSet extends HiveBaseResultSet { colNames.forEach(i -> normalizedColumnNames.add(i.toLowerCase())); } + private void InitEmptyIterator() throws SQLException { + try { + fetchedRows = RowSetFactory.create(new TRowSet(), protocol); + fetchedRowsItr = fetchedRows.iterator(); + } catch (TException e) { + throw new SQLException(e); + } + } + @Override public void close() throws SQLException { if (this.statement != null && (this.statement instanceof HiveStatement)) { @@ -290,7 +300,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { client = null; stmtHandle = null; isClosed = true; - operationStatus = null; + InitEmptyIterator(); } private void closeOperationHandle(TOperationHandle stmtHandle) throws SQLException { @@ -307,66 +317,55 @@ public class HiveQueryResultSet extends HiveBaseResultSet { } } - /** - * Moves the cursor down one row from its current position. - * - * @see java.sql.ResultSet#next() - * @throws SQLException - * if a database access error occurs. - */ - public boolean next() throws SQLException { + private boolean nextRowBatch() throws SQLException { if (isClosed) { throw new SQLException("Resultset is closed"); } - if (emptyResultSet || (maxRows > 0 && rowsFetched >= maxRows)) { + if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || fetchDone) { return false; } - - /* - * Poll on the operation status, till the operation is complete. - * We need to wait only for HiveStatement to complete. - * HiveDatabaseMetaData which also uses this ResultSet returns only after the RPC is complete. - */ - // when isHasResultSet is set, the query transitioned from running -> complete and is not expected go back to - // running state when fetching results (implicit state transition) - if ((statement instanceof HiveStatement) && (operationStatus == null || !operationStatus.isHasResultSet())) { - operationStatus = ((HiveStatement) statement).waitForOperationToComplete(); + if (check_operation_status) { + TGetOperationStatusResp operationStatus = + ((HiveStatement) statement).waitForOperationToComplete(); + check_operation_status = false; } try { - TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; - if (fetchFirst) { - // If we are asked to start from beginning, clear the current fetched resultset - orientation = TFetchOrientation.FETCH_FIRST; - fetchedRows = null; - fetchedRowsItr = null; - fetchFirst = false; - } - if (fetchedRows == null || !fetchedRowsItr.hasNext()) { - TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, - orientation, fetchSize); - LOG.debug("HiveQueryResultsFetchReq: {}", fetchReq); - TFetchResultsResp fetchResp; - fetchResp = client.FetchResults(fetchReq); - Utils.verifySuccessWithInfo(fetchResp.getStatus()); - - TRowSet results = fetchResp.getResults(); - fetchedRows = RowSetFactory.create(results, protocol); - fetchedRowsItr = fetchedRows.iterator(); + int fetchSizeBounded = fetchSize; + if (maxRows > 0 && rowsFetched + fetchSize > maxRows) { + fetchSizeBounded = maxRows - (int)rowsFetched; } + TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, + orientation, fetchSizeBounded); + TFetchResultsResp fetchResp = client.FetchResults(fetchReq); + Utils.verifySuccessWithInfo(fetchResp.getStatus()); + fetchDone = !fetchResp.isHasMoreRows(); + + fetchedRows = RowSetFactory.create(fetchResp.getResults(), protocol); + } catch (TException ex) { + ex.printStackTrace(); + throw new SQLException("Error retrieving next row", ex); + } - if (!fetchedRowsItr.hasNext()) { - return false; - } + orientation = TFetchOrientation.FETCH_NEXT; + fetchedRowsItr = fetchedRows.iterator(); - row = fetchedRowsItr.next(); - rowsFetched++; - } catch (SQLException eS) { - throw eS; - } catch (Exception ex) { - throw new SQLException("Error retrieving next row", ex); + return fetchedRowsItr.hasNext(); + } + + /** + * Moves the cursor down one row from its current position. + * + * @see java.sql.ResultSet#next() + * @throws SQLException + * if a database access error occurs. + */ + public boolean next() throws SQLException { + if (!fetchedRowsItr.hasNext() && !nextRowBatch()) { + return false; } - // NOTE: fetchOne doesn't throw new SQLFeatureNotSupportedException("Method not supported"). + row = fetchedRowsItr.next(); + rowsFetched++; return true; } @@ -430,8 +429,12 @@ public class HiveQueryResultSet extends HiveBaseResultSet { if (!isScrollable) { throw new SQLException("Method not supported for TYPE_FORWARD_ONLY resultset"); } - fetchFirst = true; + + // If we are asked to start from begining, clear the current fetched resultset + InitEmptyIterator(); + orientation = TFetchOrientation.FETCH_FIRST; rowsFetched = 0; + fetchDone = false; } @Override @@ -444,7 +447,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet { @Override public int getRow() throws SQLException { - return rowsFetched; + if (rowsFetched > Integer.MAX_VALUE) { + throw new SQLException("getRow() result exceeds Int.MAX_VALUE"); + } + return (int)rowsFetched; } @Override
