Repository: hive
Updated Branches:
  refs/heads/master 22af0eff0 -> de2a5d63a


HIVE-14901: HiveServer2: Use user supplied fetch size to determine #rows 
serialized in tasks (Norris Lee reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/de2a5d63
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/de2a5d63
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/de2a5d63

Branch: refs/heads/master
Commit: de2a5d63af78ffb6ef97062b06b9f3eb9557e361
Parents: 22af0ef
Author: Vaibhav Gumashta <vgumas...@hortonworks.com>
Authored: Mon Mar 6 14:26:24 2017 -0800
Committer: Vaibhav Gumashta <vgumas...@hortonworks.com>
Committed: Mon Mar 6 14:26:54 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 +-
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 30 ++++++++++++++++++++
 .../org/apache/hive/jdbc/HiveConnection.java    | 10 +++++++
 .../serde2/thrift/ThriftJDBCBinarySerDe.java    |  4 ++-
 .../org/apache/hive/service/cli/CLIService.java |  7 ++++-
 .../hive/service/cli/CLIServiceClient.java      |  4 +--
 .../hive/service/cli/session/HiveSession.java   | 10 ++++++-
 .../service/cli/session/HiveSessionImpl.java    | 24 ++++++++++++++++
 .../service/cli/thrift/ThriftCLIService.java    | 19 +++++++++++--
 9 files changed, 103 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a8aaa5c..676c527 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2619,7 +2619,7 @@ public class HiveConf extends Configuration {
     // TODO: Make use of this config to configure fetch size
     
HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size",
         10000, "Max number of rows sent in one Fetch RPC call by the server to 
the client."),
-    
HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.resultset.default.fetch.size",
 10000,
+    
HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.thrift.resultset.default.fetch.size",
 1000,
         "The number of rows sent in one Fetch RPC call by the server to the 
client, if not\n" +
         "specified by the client."),
     HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false,
@@ -4237,6 +4237,7 @@ public class HiveConf extends Configuration {
     "hive\\.parquet\\..*",
     "hive\\.ppd\\..*",
     "hive\\.prewarm\\..*",
+    "hive\\.server2\\.thrift\\.resultset\\.default\\.fetch\\.size",
     "hive\\.server2\\.proxy\\.user",
     "hive\\.skewjoin\\..*",
     "hive\\.smbjoin\\..*",

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 3d4057b..afe23f8 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -1321,4 +1321,34 @@ public class TestJdbcWithMiniHS2 {
       fs.delete(testPath, true);
     }
   }
+
+  @Test
+  public void testFetchSize() throws Exception {
+    // Test setting fetch size below max
+    Connection fsConn = getConnection(miniHS2.getJdbcURL("default", 
"fetchSize=50", ""),
+      System.getProperty("user.name"), "bar");
+    Statement stmt = fsConn.createStatement();
+    stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true");
+    int fetchSize = stmt.getFetchSize();
+    assertEquals(50, fetchSize);
+    stmt.close();
+    fsConn.close();
+    // Test setting fetch size above max
+    fsConn = getConnection(
+      miniHS2.getJdbcURL(
+        "default",
+        "fetchSize=" + (miniHS2.getHiveConf().getIntVar(
+          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE) + 1),
+        ""),
+      System.getProperty("user.name"), "bar");
+    stmt = fsConn.createStatement();
+    stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true");
+    fetchSize = stmt.getFetchSize();
+    assertEquals(
+      miniHS2.getHiveConf().getIntVar(
+        HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE),
+      fetchSize);
+    stmt.close();
+    fsConn.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 535ad3d..ed899c6 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -639,6 +639,9 @@ public class HiveConnection implements java.sql.Connection {
     }
     // switch the database
     openConf.put("use:database", connParams.getDbName());
+    // set the fetchSize
+    
openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
+      Integer.toString(fetchSize));
 
     // set the session configuration
     Map<String, String> sessVars = connParams.getSessionVars();
@@ -664,6 +667,13 @@ public class HiveConnection implements java.sql.Connection 
{
       }
       protocol = openResp.getServerProtocolVersion();
       sessHandle = openResp.getSessionHandle();
+
+      // Update fetchSize if modified by server
+      String serverFetchSize =
+        
openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size");
+      if (serverFetchSize != null) {
+        fetchSize = Integer.parseInt(serverFetchSize);
+      }
     } catch (TException e) {
       LOG.error("Error opening session", e);
       throw new SQLException("Could not establish connection to "

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
----------------------------------------------------------------------
diff --git 
a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
 
b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
index 84ed6ba..16cc916 100644
--- 
a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
+++ 
b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java
@@ -71,7 +71,9 @@ public class ThriftJDBCBinarySerDe extends AbstractSerDe {
   @Override
   public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
     // Get column names
-       MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+    MAX_BUFFERED_ROWS =
+      HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);
+    LOG.info("ThriftJDBCBinarySerDe max number of buffered columns: " + 
MAX_BUFFERED_ROWS);
     String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
     String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
     final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/CLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java 
b/service/src/java/org/apache/hive/service/cli/CLIService.java
index a009e25..5bb4ee0 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -81,7 +81,7 @@ public class CLIService extends CompositeService implements 
ICLIService {
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
     sessionManager = new SessionManager(hiveServer2);
-    defaultFetchRows = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE);
+    defaultFetchRows = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);
     addService(sessionManager);
     //  If the hadoop cluster is secure, do a kerberos login for the service 
from the keytab
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -476,6 +476,11 @@ public class CLIService extends CompositeService 
implements ICLIService {
     return opStatus;
   }
 
+  public HiveConf getSessionConf(SessionHandle sessionHandle)
+      throws HiveSQLException {
+         return sessionManager.getSession(sessionHandle).getSessionConf();
+  }
+
   private static final long PROGRESS_MAX_WAIT_NS = 30 * 1000000000l;
   private JobProgressUpdate progressUpdateLog(boolean isProgressLogRequested, 
Operation operation) {
     if (!isProgressLogRequested || 
!ServiceUtils.canProvideProgressLog(hiveConf)

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java 
b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
index 4b84872..c965abc 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIServiceClient.java
@@ -31,10 +31,10 @@ import org.apache.hive.service.auth.HiveAuthFactory;
  *
  */
 public abstract class CLIServiceClient implements ICLIService {
-  protected int defaultFetchRows = 
ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;
+  protected int defaultFetchRows = 
ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;
 
   public CLIServiceClient(Configuration conf) {
-    defaultFetchRows = HiveConf.getIntVar(conf, 
ConfVars.HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE);
+    defaultFetchRows = HiveConf.getIntVar(conf, 
ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);
   }
 
   public SessionHandle openSession(String username, String password)

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java 
b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index e5d865b..bd4d90d 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hive.service.auth.HiveAuthFactory;
@@ -183,7 +184,14 @@ public interface HiveSession extends HiveSessionBase {
              String primarySchema, String primaryTable, String foreignCatalog,
              String foreignSchema, String foreignTable) 
     throws HiveSQLException;
-  
+
+  /**
+   *
+   * @return
+   * @throws HiveSQLException
+   */
+  HiveConf getSessionConf() throws HiveSQLException;
+
   /**
    * close the session
    * @throws HiveSQLException

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java 
b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index f939a93..fd74d55 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -177,6 +177,9 @@ public class HiveSessionImpl implements HiveSession {
     }
     // Process global init file: .hiverc
     processGlobalInitFile();
+    // Set fetch size in session conf map
+    sessionConfMap = setFetchSize(sessionConfMap);
+
     if (sessionConfMap != null) {
       configureSession(sessionConfMap);
     }
@@ -245,6 +248,22 @@ public class HiveSessionImpl implements HiveSession {
     }
   }
 
+  private Map<String, String> setFetchSize(Map<String, String> sessionConfMap) 
{
+    int maxFetchSize =
+      
sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+    String confFetchSize = sessionConfMap != null ?
+      sessionConfMap.get(
+        "set:hiveconf:" + 
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname) :
+        null;
+    if (confFetchSize != null && !confFetchSize.isEmpty()) {
+        int fetchSize = Integer.parseInt(confFetchSize);
+        sessionConfMap.put(
+          "set:hiveconf:" + 
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname,
+          Integer.toString(fetchSize > maxFetchSize ? maxFetchSize : 
fetchSize));
+    }
+    return sessionConfMap;
+  }
+
   private void configureSession(Map<String, String> sessionConfMap) throws 
HiveSQLException {
     SessionState.setCurrentSessionState(sessionState);
     for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
@@ -465,6 +484,11 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
+  public HiveConf getSessionConf() throws HiveSQLException {
+         return this.sessionConf;
+  }
+
+  @Override
   public OperationHandle executeStatement(String statement, Map<String, 
String> confOverlay) throws HiveSQLException {
     return executeStatementInternal(statement, confOverlay, false, 0);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/de2a5d63/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 211b33b..0fdc8d9 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -317,10 +317,19 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
     LOG.info("Client protocol version: " + req.getClient_protocol());
     TOpenSessionResp resp = new TOpenSessionResp();
     try {
+      Map<String, String> openConf = req.getConfiguration();
+
       SessionHandle sessionHandle = getSessionHandle(req, resp);
       resp.setSessionHandle(sessionHandle.toTSessionHandle());
-      // TODO: set real configuration map
-      resp.setConfiguration(new HashMap<String, String>());
+      Map<String, String> configurationMap = new HashMap<String, String>();
+      // Set the updated fetch size from the server into the configuration map 
for the client
+      HiveConf sessionConf = cliService.getSessionConf(sessionHandle);
+      configurationMap.put(
+        
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname,
+        Integer.toString(sessionConf != null ?
+          
sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)
 :
+          
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)));
+      resp.setConfiguration(configurationMap);
       resp.setStatus(OK_STATUS);
       ThriftCLIServerContext context =
         (ThriftCLIServerContext)currentServerContext.get();
@@ -733,6 +742,12 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
   public TFetchResultsResp FetchResults(TFetchResultsReq req) throws 
TException {
     TFetchResultsResp resp = new TFetchResultsResp();
     try {
+      // Set fetch size
+      int maxFetchSize =
+        
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE);
+      if (req.getMaxRows() > maxFetchSize) {
+        req.setMaxRows(maxFetchSize);
+      }
       RowSet rowSet = cliService.fetchResults(
           new OperationHandle(req.getOperationHandle()),
           FetchOrientation.getFetchOrientation(req.getOrientation()),

Reply via email to