This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 7871199a49c692deef70628bc12ef022f4099bd3
Author: Kurt Deschler <[email protected]>
AuthorDate: Sat Sep 16 20:53:46 2023 -0500

    HIVE-27872: Support multi-stream parallel fetch in JDBC driver (Kurt 
Deschler, reviewed by Attila Turoczy, Denys Kuzmenko)
    
    This patch enables JDBC to open multiple sockets to an HS2 service and
    performance concurrent fetch results for a single query. This can
    significantly speed up fetching of large results that are bottlenecked
    on Thrift serialization, de-serialization, and string conversion.
    With adequate threads, fetch performance will now only be limited by
    the single-threaded client-side result processing and server-size row
    materialization.
    
    Added JDBC Client parameter fetchThreads to control the number of
    threads allocated for fetching. Setting fetchThreads=1 will pipeline
    the Fetch using the existing connection asynchronously. Setting
    fetchThreads>1 will cause an additional Thrift connection to be opened
    to the server for each thread. Care should be taken not to over-allocate
    connections to the server.
    
    Added new HiveConf parameter hive.jdbc.fetch.threads to allow config
    of fetchThrads from server conf.
    
    Closes #4902
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   2 +
 .../java/org/apache/hive/jdbc/HiveConnection.java  |  62 ++++-
 .../org/apache/hive/jdbc/HiveQueryResultSet.java   | 267 ++++++++++++++++++---
 .../java/org/apache/hive/jdbc/HiveStatement.java   |  16 +-
 jdbc/src/java/org/apache/hive/jdbc/Utils.java      |  11 +
 5 files changed, 318 insertions(+), 40 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cf564da18d4..9c04dc061b9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1960,6 +1960,8 @@ public class HiveConf extends Configuration {
     HIVE_ENABLE_JDBC_SAFE_PUSHDOWN("hive.jdbc.pushdown.safe.enable", false,
         "Flag to control enabling pushdown of operators using Calcite that 
prevent splitting results\n" +
         "retrieval in the JDBC storage handler"),
+    HIVE_JDBC_FETCH_THREADS("hive.jdbc.fetch.threads", 1,
+        "Controls the number of thread/connections used to fetch results for a 
JDBC query"),
 
     // hive.mapjoin.bucket.cache.size has been replaced by 
hive.smbjoin.cache.row,
     // need to remove by hive .13. Also, do not change default (see SMB 
operator)
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 015a9f95cd6..b90a9c987f0 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -19,6 +19,7 @@
 package org.apache.hive.jdbc;
 
 import static org.apache.hadoop.hive.conf.Constants.MODE;
+import static org.apache.hive.service.auth.HiveAuthConstants.AuthTypes;
 import static 
org.apache.hive.service.cli.operation.hplsql.HplSqlQueryExecutor.HPLSQL;
 
 import java.io.BufferedReader;
@@ -106,6 +107,7 @@ import org.apache.hive.service.auth.HiveAuthConstants;
 import org.apache.hive.service.auth.KerberosSaslHelper;
 import org.apache.hive.service.auth.PlainSaslHelper;
 import org.apache.hive.service.auth.SaslQOP;
+
 import org.apache.hive.service.cli.session.SessionUtils;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
@@ -177,6 +179,7 @@ public class HiveConnection implements java.sql.Connection {
   private int loginTimeout = 0;
   private TProtocolVersion protocol;
   int fetchSize;
+  int fetchThreads;
   private String initFile = null;
   private String wmPool = null, wmApp = null;
   private Properties clientInfo;
@@ -184,6 +187,8 @@ public class HiveConnection implements java.sql.Connection {
   private int maxRetries = 1;
   private IJdbcBrowserClient browserClient;
 
+  public TCLIService.Iface getClient() { return client; }
+
   /**
    * Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL
    * @param zookeeperBasedHS2Url
@@ -279,12 +284,26 @@ public class HiveConnection implements 
java.sql.Connection {
     sessConfMap = null;
     isEmbeddedMode = true;
     fetchSize = 50;
+    fetchThreads = 0;
   }
 
   public HiveConnection(String uri, Properties info) throws SQLException {
     this(uri, info, HiveJdbcBrowserClientFactory.get());
   }
 
+  /**
+   * Create a new connection that shares the same session ID as the current 
connection.
+   */
+  public HiveConnection(HiveConnection hiveConnection) throws SQLException {
+    this(hiveConnection.getConnectedUrl(), hiveConnection.getClientInfo(), 
HiveJdbcBrowserClientFactory.get(), false);
+    // These are set/updated when the session is established.
+    this.sessHandle = hiveConnection.sessHandle;
+    this.connParams = hiveConnection.connParams;
+    this.protocol = hiveConnection.protocol;
+    this.fetchSize = hiveConnection.fetchSize;
+    this.fetchThreads = hiveConnection.fetchThreads;
+  }
+
   @VisibleForTesting
   protected int getNumRetries() {
     return maxRetries;
@@ -293,6 +312,12 @@ public class HiveConnection implements java.sql.Connection 
{
   @VisibleForTesting
   protected HiveConnection(String uri, Properties info,
       IJdbcBrowserClientFactory browserClientFactory) throws SQLException {
+    this(uri, info, browserClientFactory, true);
+  }
+
+  protected HiveConnection(String uri, Properties info,
+      IJdbcBrowserClientFactory browserClientFactory,
+      boolean initSession) throws SQLException {
     try {
       connParams = Utils.parseURL(uri, info);
     } catch (ZooKeeperHiveClientException e) {
@@ -311,7 +336,7 @@ public class HiveConnection implements java.sql.Connection {
       // Ensure UserGroupInformation includes any authorized Kerberos 
principals.
       LOG.debug("Configuring Kerberos mode");
       Configuration config = new Configuration();
-      config.set("hadoop.security.authentication", "Kerberos");
+      config.set("hadoop.security.authentication", 
AuthTypes.KERBEROS.getAuthName());
       UserGroupInformation.setConfiguration(config);
 
       if (isEnableCanonicalHostnameCheck()) {
@@ -365,8 +390,10 @@ public class HiveConnection implements java.sql.Connection 
{
         throw new SQLException(new IllegalArgumentException(
             "Browser mode is not supported in embedded mode"));
       }
-      openSession();
-      executeInitSql();
+      if (initSession) {
+        openSession();
+        executeInitSql();
+      }
     } else {
       long retryInterval = 1000L;
       try {
@@ -388,8 +415,10 @@ public class HiveConnection implements java.sql.Connection 
{
           // set up the client
           client = new TCLIService.Client(new TBinaryProtocol(transport));
           // open client session
-          openSession();
-          executeInitSql();
+          if (initSession) {
+            openSession();
+            executeInitSql();
+          }
 
           break;
         } catch (Exception e) {
@@ -1255,10 +1284,10 @@ public class HiveConnection implements 
java.sql.Connection {
     protocol = openResp.getServerProtocolVersion();
     sessHandle = openResp.getSessionHandle();
 
-    ConfVars confVars = 
ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE;
-    int serverFetchSize = 
Optional.ofNullable(openResp.getConfiguration().get(confVars.varname))
+    ConfVars fetchSizeConf = 
ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE;
+    int serverFetchSize = 
Optional.ofNullable(openResp.getConfiguration().get(fetchSizeConf.varname))
         .map(size -> Integer.parseInt(size))
-        .orElse(confVars.defaultIntVal);
+        .orElse(fetchSizeConf.defaultIntVal);
     if (serverFetchSize <= 0) {
       throw new IllegalStateException("Default fetch size must be greater than 
0");
     }
@@ -1266,6 +1295,19 @@ public class HiveConnection implements 
java.sql.Connection {
         .map(size -> Integer.parseInt(size))
         .filter(v -> v > 0)
         .orElse(serverFetchSize);
+
+    ConfVars fetchThreadsConf = ConfVars.HIVE_JDBC_FETCH_THREADS;
+    int serverFetchThreads = 
Optional.ofNullable(openResp.getConfiguration().get(fetchThreadsConf.varname))
+        .map(size -> Integer.parseInt(size))
+        .orElse(fetchThreadsConf.defaultIntVal);
+    if (serverFetchThreads <= 0) {
+      throw new IllegalStateException("Default fetch threads must be >= 0");
+    }
+    this.fetchThreads = 
Optional.ofNullable(sessConfMap.get(JdbcConnectionParams.FETCH_THREADS))
+        .map(size -> Integer.parseInt(size))
+        .filter(v -> v >= 0)
+        .orElse(serverFetchThreads);
+
   }
 
   /**
@@ -1582,7 +1624,7 @@ public class HiveConnection implements 
java.sql.Connection {
     if (isClosed) {
       throw new SQLException("Can't create Statement, connection is closed");
     }
-    return new HiveStatement(this, client, sessHandle, false, fetchSize);
+    return new HiveStatement(this, client, sessHandle, false, fetchSize, 
fetchThreads);
   }
 
   /*
@@ -1605,7 +1647,7 @@ public class HiveConnection implements 
java.sql.Connection {
     if (isClosed) {
       throw new SQLException("Connection is closed");
     }
-    return new HiveStatement(this, client, sessHandle, resultSetType == 
ResultSet.TYPE_SCROLL_INSENSITIVE, fetchSize);
+    return new HiveStatement(this, client, sessHandle, resultSetType == 
ResultSet.TYPE_SCROLL_INSENSITIVE);
   }
 
   /*
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index 375d1165248..874dfb55301 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -26,10 +26,19 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hive.service.cli.RowSet;
@@ -65,10 +74,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(HiveQueryResultSet.class);
 
+  private Connection connection;
   private TCLIService.Iface client;
   private TOperationHandle stmtHandle;
   private TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
-  private boolean check_operation_status;
+  private boolean checkOperationStatus;
   private int maxRows;
   private int fetchSize;
   private long rowsFetched = 0;
@@ -82,10 +92,29 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
 
   private final TProtocolVersion protocol;
 
+  ExecutorService pool = null;
+  AtomicBoolean hasStartRow = new AtomicBoolean(false);
+  int fetchThreads = 1;
+  int threadsStarted = 0;
+
+  private class FetchResult {
+    Exception ex;
+    RowSet fetchedRows;
+    boolean hasMoreRows;
+    long startRow;
+    int numRows;
+  }
+
+  BlockingQueue<FetchResult> resultQueue;
+  AtomicLong nextStartRow = new AtomicLong(1L);
+  AtomicReference<InterruptedException> interruptException = new 
AtomicReference<>();
+  AtomicBoolean gotLastBatch = new AtomicBoolean(false);
+  AtomicBoolean poolDone = new AtomicBoolean(false);
+
   public static class Builder {
 
-    private final Connection connection;
     private final Statement statement;
+    private Connection connection = null;
     private TCLIService.Iface client = null;
     private TOperationHandle stmtHandle = null;
 
@@ -100,6 +129,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     private List<String> colTypes;
     private List<JdbcColumnAttributes> colAttributes;
     private int fetchSize = 50;
+    private int fetchThreads = 1;
     private boolean emptyResultSet = false;
     private boolean isScrollable = false;
 
@@ -113,6 +143,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
       this.connection = connection;
     }
 
+    public Builder setConnection(Connection connection) {
+      this.connection = connection;
+      return this;
+    }
+
     public Builder setClient(TCLIService.Iface client) {
       this.client = client;
       return this;
@@ -149,6 +184,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
       return this;
     }
 
+    public Builder setFetchThreads(int fetchThreads) {
+      this.fetchThreads = fetchThreads;
+      return this;
+    }
+
     public Builder setEmptyResultSet(boolean emptyResultSet) {
       this.emptyResultSet = emptyResultSet;
       return this;
@@ -171,8 +211,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
   protected HiveQueryResultSet(Builder builder) throws SQLException {
     this.statement = builder.statement;
     this.client = builder.client;
+    this.connection = builder.connection;
     this.stmtHandle = builder.stmtHandle;
     this.fetchSize = builder.fetchSize;
+    this.fetchThreads = builder.fetchThreads;
     columnNames = new ArrayList<String>();
     normalizedColumnNames = new ArrayList<String>();
     columnTypes = new ArrayList<String>();
@@ -184,10 +226,11 @@ public class HiveQueryResultSet extends HiveBaseResultSet 
{
     }
     this.emptyResultSet = builder.emptyResultSet;
     this.maxRows = builder.maxRows;
-    check_operation_status = (statement instanceof HiveStatement);
+    checkOperationStatus = (statement instanceof HiveStatement);
     this.isScrollable = builder.isScrollable;
     this.protocol = builder.getProtocolVersion();
-    InitEmptyIterator();
+    initEmptyIterator();
+    resultQueue = new ArrayBlockingQueue<>(Math.max(fetchThreads, 1));
   }
 
   /**
@@ -272,7 +315,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     colNames.forEach(i -> normalizedColumnNames.add(i.toLowerCase()));
   }
 
-  private void InitEmptyIterator() throws SQLException {
+  private void initEmptyIterator() throws SQLException {
     try {
       fetchedRows = RowSetFactory.create(new TRowSet(), protocol);
       fetchedRowsItr = fetchedRows.iterator();
@@ -283,6 +326,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
 
   @Override
   public void close() throws SQLException {
+    shutdownPool();
     if (this.statement != null && (this.statement instanceof HiveStatement)) {
       /*
        * HIVE-25203: Be aware that a ResultSet is not supposed to control its 
parent Statement's
@@ -300,7 +344,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     client = null;
     stmtHandle = null;
     isClosed = true;
-    InitEmptyIterator();
+    initEmptyIterator();
   }
 
   private void closeOperationHandle(TOperationHandle stmtHandle) throws 
SQLException {
@@ -317,6 +361,16 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     }
   }
 
+  private void closeConn(HiveConnection conn) {
+    if (conn != null) {
+      try {
+        conn.close();
+      } catch (SQLException e) {
+        LOG.debug("Error closing connection {}", e.toString());
+      }
+    }
+  }
+
   private boolean nextRowBatch() throws SQLException {
     if (isClosed) {
       throw new SQLException("Resultset is closed");
@@ -324,27 +378,148 @@ public class HiveQueryResultSet extends 
HiveBaseResultSet {
     if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || 
fetchDone) {
       return false;
     }
-    if (check_operation_status) {
-      TGetOperationStatusResp operationStatus =
-        ((HiveStatement) statement).waitForOperationToComplete();
-      check_operation_status = false;
-    }
-
-    try {
-      int fetchSizeBounded = fetchSize;
-      if (maxRows > 0 && rowsFetched + fetchSize > maxRows) {
-        fetchSizeBounded = maxRows - (int)rowsFetched;
+    if (checkOperationStatus) {
+      ((HiveStatement) statement).waitForOperationToComplete();
+      checkOperationStatus = false;
+    }
+
+// TODO: Could support pool with maxRows by bounding results instead
+    if (rowsFetched < fetchSize || fetchThreads == 0 || maxRows > 0) {
+      try {
+        int fetchSizeBounded = fetchSize;
+        if (maxRows > 0 && rowsFetched + fetchSize > maxRows) {
+          fetchSizeBounded = maxRows - (int)rowsFetched;
+        }
+        TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
+            orientation, fetchSizeBounded);
+        TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+        Utils.verifySuccessWithInfo(fetchResp.getStatus());
+        TRowSet results = fetchResp.getResults();
+        if (results.getStartRowOffset() > 0) {
+          hasStartRow.set(true);
+        }
+        fetchedRows = RowSetFactory.create(results, protocol);
+        fetchDone = !fetchResp.isHasMoreRows() && fetchedRows.numRows() == 0;
+        if (fetchDone) {
+          gotLastBatch.set(true);
+        }
+        fetchedRows = RowSetFactory.create(results, protocol);
+        nextStartRow.set(results.getStartRowOffset() + 1 + 
fetchedRows.numRows());
+      } catch (TException ex) {
+        throw new SQLException("Error retrieving next row", ex);
+      }
+    } else {
+      if (!gotLastBatch.get()) {
+        if (pool == null) {
+          pool = Executors.newFixedThreadPool(fetchThreads);
+        }
+        // Add another thread on each row batch up to the limit
+        if (threadsStarted < (hasStartRow.get() ? fetchThreads : 1)) {
+          final boolean useMainClient = (threadsStarted == 0);
+          threadsStarted++;
+          pool.execute(()-> {
+            LOG.debug("Started thread {}", Thread.currentThread().getName());
+
+            TCLIService.Iface fetchClient = null;
+            HiveConnection threadConn = null;
+            long startTime = System.nanoTime();
+
+            while (!gotLastBatch.get() && !poolDone.get()) {
+              if (threadConn != connection) {
+                long endTime = System.nanoTime();
+                // Re-open cloned connections every 5 sec to avoid starvation
+                if (endTime - startTime > 5000000000L) {
+                  closeConn(threadConn);
+                  threadConn = null;
+                  startTime = endTime;
+                }
+              }
+              if (threadConn == null) {
+                if (useMainClient) {
+                  threadConn = (HiveConnection)connection;
+                  fetchClient = client;
+                } else {
+                  try {
+                    threadConn = new 
HiveConnection((HiveConnection)connection);
+                    fetchClient = threadConn.getClient();
+                  } catch (SQLException e) {
+                    LOG.debug("Multi-stream connection error {}", 
e.toString());
+                    return;
+                  }
+                }
+              }
+              FetchResult result = new FetchResult();
+              try {
+                final TFetchResultsReq fetchReq = new TFetchResultsReq(
+                    stmtHandle, orientation, fetchSize);
+                TFetchResultsResp fetchResp = 
fetchClient.FetchResults(fetchReq);
+                Utils.verifySuccessWithInfo(fetchResp.getStatus());
+                TRowSet results = fetchResp.getResults();
+                if (results.getStartRowOffset() > 0) {
+                  hasStartRow.set(true);
+                }
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                result.numRows = result.fetchedRows.numRows();
+                boolean hasMoreRows = result.numRows > 0 || 
fetchResp.isHasMoreRows();
+                if (!hasMoreRows) {
+                  gotLastBatch.set(true);
+                }
+                result.hasMoreRows = hasMoreRows;
+                result.fetchedRows = RowSetFactory.create(results, protocol);
+                result.startRow = results.getStartRowOffset() + 1;
+                if (hasStartRow.get() && result.startRow < nextStartRow.get()) 
{
+                  throw new SQLException("Unexpected row offset");
+                }
+              } catch (Exception e) {
+                result.ex = e;
+              }
+
+              try {
+                // Wait for earlier row sets to be added to the queue
+                synchronized(nextStartRow) {
+                  if (!poolDone.get()) {
+                    if (result.ex == null) {
+                      if (hasStartRow.get()) {
+                        while (nextStartRow.get() != result.startRow) {
+                          nextStartRow.wait();
+                        }
+                        nextStartRow.set(result.startRow + result.numRows);
+                      }
+                      poolDone.set(!result.hasMoreRows);
+                    } else {
+                      poolDone.set(true);
+                    }
+                    resultQueue.put(result);
+                    if (hasStartRow.get()) {
+                      nextStartRow.notifyAll();
+                    }
+                  }
+                }
+              } catch (InterruptedException e) {
+                interruptException.set(e);
+                break;
+              }
+            }
+            if (threadConn != connection) {
+              closeConn(threadConn);
+            }
+          });
+        }
+      }
+      try {
+        if (interruptException.get() != null) {
+          throw interruptException.get();
+        }
+        FetchResult result = resultQueue.take();
+        fetchDone = !result.hasMoreRows;
+        if (result.ex != null) {
+          shutdownPool();
+          throw new SQLException(result.ex);
+        }
+        fetchedRows = result.fetchedRows;
+      } catch (InterruptedException e) {
+        throw new SQLException(e);
       }
-      TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
-          orientation, fetchSizeBounded);
-      TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
-      Utils.verifySuccessWithInfo(fetchResp.getStatus());
-      fetchDone = !fetchResp.isHasMoreRows();
-
-      fetchedRows = RowSetFactory.create(fetchResp.getResults(), protocol);
-    } catch (TException ex) {
-      ex.printStackTrace();
-      throw new SQLException("Error retrieving next row", ex);
     }
 
     orientation = TFetchOrientation.FETCH_NEXT;
@@ -353,6 +528,34 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     return fetchedRowsItr.hasNext();
   }
 
+  void drainQueue() throws SQLException {
+    FetchResult result;
+    while ((result = resultQueue.poll()) != null) {
+      if (result.ex != null) {
+        throw new SQLException(result.ex);
+      }
+    }
+  }
+
+  void shutdownPool() throws SQLException {
+    if (pool != null) {
+      poolDone.set(true);
+      drainQueue();
+      pool.shutdownNow();
+      try {
+        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+          drainQueue();
+          LOG.debug("Slow fetch thread shutdown");
+        }
+      } catch (InterruptedException e) {
+        throw new SQLException(e);
+      }
+      drainQueue();
+      pool = null;
+      threadsStarted = 0;
+    }
+  }
+
   /**
    * Moves the cursor down one row from its current position.
    *
@@ -361,8 +564,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
    *           if a database access error occurs.
    */
   public boolean next() throws SQLException {
-    if (!fetchedRowsItr.hasNext() && !nextRowBatch()) {
-      return false;
+    while (!fetchedRowsItr.hasNext()) {
+      if (!nextRowBatch()) {
+        return false;
+      }
     }
     row = fetchedRowsItr.next();
     rowsFetched++;
@@ -431,10 +636,14 @@ public class HiveQueryResultSet extends HiveBaseResultSet 
{
     }
 
     // If we are asked to start from begining, clear the current fetched 
resultset
-    InitEmptyIterator();
+    shutdownPool();
+    initEmptyIterator();
     orientation = TFetchOrientation.FETCH_FIRST;
     rowsFetched = 0;
+    nextStartRow.set(1L);
     fetchDone = false;
+    poolDone.set(false);
+    gotLastBatch.set(false);
   }
 
   @Override
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 506cfb30449..aba982670ac 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -76,6 +76,7 @@ public class HiveStatement implements java.sql.Statement {
   private final TSessionHandle sessHandle;
   Map<String, String> sessConf = new HashMap<>();
   private int fetchSize;
+  private int fetchThreads;
   private final boolean isScrollableResultset;
   private boolean isOperationComplete = false;
   private boolean closeOnResultSetCompletion = false;
@@ -126,11 +127,21 @@ public class HiveStatement implements java.sql.Statement {
 
   public HiveStatement(HiveConnection connection, TCLIService.Iface client,
       TSessionHandle sessHandle) {
-    this(connection, client, sessHandle, false, connection.fetchSize);
+    this(connection, client, sessHandle, false, connection.fetchSize, 
connection.fetchThreads);
+  }
+
+  public HiveStatement(HiveConnection connection, TCLIService.Iface client,
+      TSessionHandle sessHandle, boolean isScrollableResultset) {
+    this(connection, client, sessHandle, isScrollableResultset, 
connection.fetchSize, connection.fetchThreads);
   }
 
   public HiveStatement(HiveConnection connection, TCLIService.Iface client, 
TSessionHandle sessHandle,
       boolean isScrollableResultset, int fetchSize) {
+    this(connection, client, sessHandle, isScrollableResultset, fetchSize, 
connection.fetchThreads);
+  }
+
+  public HiveStatement(HiveConnection connection, TCLIService.Iface client, 
TSessionHandle sessHandle,
+      boolean isScrollableResultset, int fetchSize, int fetchThreads) {
     this.connection = Objects.requireNonNull(connection);
     this.client = Objects.requireNonNull(client);
     this.sessHandle = Objects.requireNonNull(sessHandle);
@@ -142,6 +153,7 @@ public class HiveStatement implements java.sql.Statement {
     this.isScrollableResultset = isScrollableResultset;
     this.inPlaceUpdateStream = Optional.empty();
     this.stmtHandle = Optional.empty();
+    this.fetchThreads = fetchThreads;
   }
 
   @Override
@@ -293,6 +305,7 @@ public class HiveStatement implements java.sql.Statement {
     }
     resultSet = new HiveQueryResultSet.Builder(this).setClient(client)
         
.setStmtHandle(stmtHandle.get()).setMaxRows(maxRows).setFetchSize(fetchSize)
+        .setConnection(connection).setFetchThreads(fetchThreads)
         .setScrollable(isScrollableResultset)
         .build();
     return true;
@@ -321,6 +334,7 @@ public class HiveStatement implements java.sql.Statement {
     }
     resultSet =
         new HiveQueryResultSet.Builder(this).setClient(client)
+            .setConnection(connection).setFetchThreads(fetchThreads)
             .setStmtHandle(stmtHandle.get()).setMaxRows(maxRows)
             .setFetchSize(fetchSize).setScrollable(isScrollableResultset)
             .build();
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java 
b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 87007e9e4ba..a4230a4699f 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -166,6 +166,7 @@ public class Utils {
     // Set the fetchSize
     static final String FETCH_SIZE = "fetchSize";
     static final String INIT_FILE = "initFile";
+    static final String FETCH_THREADS = "fetchThreads";
     static final String WM_POOL = "wmPool";
     // Cookie prefix
     static final String HTTP_COOKIE_PREFIX = "http.cookie.";
@@ -220,6 +221,16 @@ public class Utils {
     public static final String HIVE_DEFAULT_NULLS_LAST_KEY =
         HIVE_CONF_PREFIX + HiveConf.ConfVars.HIVE_DEFAULT_NULLS_LAST.varname;
 
+    private static String getFetchThreadsVarname() {
+      try {
+        return HiveConf.ConfVars.HIVE_JDBC_FETCH_THREADS.varname;
+      } catch(java.lang.NoSuchFieldError e) {
+        return "hive.jdbc.fetch.threads";
+      }
+    }
+    public static final String HIVE_HIVE_JDBC_FETCH_THREADS_KEY =
+        HIVE_CONF_PREFIX + getFetchThreadsVarname();
+
     public JdbcConnectionParams() {
     }
 

Reply via email to