This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c82af8d5502 [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to 
TRowSet/TTableSchema directly
c82af8d5502 is described below

commit c82af8d5502d0e334fe2e628ce66435ec0e341e8
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri May 13 10:35:53 2022 +0800

    [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to 
TRowSet/TTableSchema directly
    
    ### What changes were proposed in this pull request?
    
    The PR is mainly refactoring, aiming to support TimestampNTZ/LTZ at the 
same time in the future for the thrift server.
    
    As we all know, in spark, we have hive dependencies, which can be 
classified into two types:
    - as a client, for accessing hive metastore/storage, etc, which is now 
v2.3.9, better to stay in a stable low version to be supported by higher hive 
metastore servers with backward compatibility
    - as a server, for being accessed by Hive JDBC/Thrift client, e.g. beeline, 
which is now v3.1.2, better to have a higher version to support more clients
    
    The problem here is that we now convert spark results to 
`org.apache.hadoop.hive.serde2.thrift.Type` first and then to 
`org.apache.hive.service.rpc.thrift.TTypeId`. the former does not have 2 
timestamp types, namely, doesn't have `TIMESTAMPLOCALTZ_TYPE`.
    
    To avoid this, we take a shortcut to map spark results to thrift schema and 
rowset directly.
    
    Besides, it also can avoid some unnecessary memory copies from type to type.
    
    Most functionalities have been verified in 
[apache/kyuubi](https://github.com/apache/incubator-kyuubi) for several years.
    
    ### Why are the changes needed?
    
    for supporting more spark datatypes through hive jdbc
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    existing ut shall be enough
    
    Closes #36373 from yaooqinn/SPARK-39041.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../org/apache/hive/service/cli/CLIService.java    |  14 +-
 .../apache/hive/service/cli/CLIServiceClient.java  |   3 +-
 .../org/apache/hive/service/cli/ICLIService.java   |  11 +-
 .../cli/operation/ExecuteStatementOperation.java   |  21 -
 .../cli/operation/GetCatalogsOperation.java        |  18 +-
 .../service/cli/operation/GetColumnsOperation.java |  10 +-
 .../cli/operation/GetCrossReferenceOperation.java  |  10 +-
 .../cli/operation/GetFunctionsOperation.java       |  10 +-
 .../cli/operation/GetPrimaryKeysOperation.java     |  10 +-
 .../service/cli/operation/GetSchemasOperation.java |  10 +-
 .../cli/operation/GetTableTypesOperation.java      |  10 +-
 .../service/cli/operation/GetTablesOperation.java  |  10 +-
 .../cli/operation/GetTypeInfoOperation.java        |  10 +-
 .../cli/operation/HiveCommandOperation.java        | 214 ----------
 .../hive/service/cli/operation/Operation.java      |  12 +-
 .../service/cli/operation/OperationManager.java    |  27 +-
 .../hive/service/cli/operation/SQLOperation.java   | 450 ---------------------
 .../hive/service/cli/session/HiveSession.java      |   6 +-
 .../hive/service/cli/session/HiveSessionImpl.java  |   8 +-
 .../hive/service/cli/thrift/ThriftCLIService.java  |   8 +-
 .../service/cli/thrift/ThriftCLIServiceClient.java |  10 +-
 .../spark/sql/hive/thriftserver/RowSetUtils.scala  | 241 +++++++++++
 .../SparkExecuteStatementOperation.scala           | 185 ++++-----
 .../thriftserver/HiveThriftServer2Suites.scala     |  20 +-
 .../SparkExecuteStatementOperationSuite.scala      |  28 +-
 .../ThriftServerWithSparkContextSuite.scala        |   4 +-
 26 files changed, 454 insertions(+), 906 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
index f4d07d10a43..4164ef42825 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java
@@ -46,6 +46,8 @@ import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.apache.hive.service.server.HiveServer2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -496,9 +498,9 @@ public class CLIService extends CompositeService implements 
ICLIService {
    * @see 
org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle)
    */
   @Override
-  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+  public TTableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException {
-    TableSchema tableSchema = sessionManager.getOperationManager()
+    TTableSchema tableSchema = sessionManager.getOperationManager()
         
.getOperation(opHandle).getParentSession().getResultSetMetadata(opHandle);
     LOG.debug(opHandle + ": getResultSetMetadata()");
     return tableSchema;
@@ -508,16 +510,16 @@ public class CLIService extends CompositeService 
implements ICLIService {
    * @see 
org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
    */
   @Override
-  public RowSet fetchResults(OperationHandle opHandle)
+  public TRowSet fetchResults(OperationHandle opHandle)
       throws HiveSQLException {
     return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION,
         Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT);
   }
 
   @Override
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation,
-                             long maxRows, FetchType fetchType) throws 
HiveSQLException {
-    RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle)
+  public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation,
+      long maxRows, FetchType fetchType) throws HiveSQLException {
+    TRowSet rowSet = 
sessionManager.getOperationManager().getOperation(opHandle)
         .getParentSession().fetchResults(opHandle, orientation, maxRows, 
fetchType);
     LOG.debug(opHandle + ": fetchResults()");
     return rowSet;
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
index 684c666da63..964dd5a5899 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java
@@ -20,6 +20,7 @@ package org.apache.hive.service.cli;
 import java.util.Collections;
 
 import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 
 
 /**
@@ -35,7 +36,7 @@ public abstract class CLIServiceClient implements ICLIService 
{
   }
 
   @Override
-  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException 
{
+  public TRowSet fetchResults(OperationHandle opHandle) throws 
HiveSQLException {
     // TODO: provide STATIC default value
     return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 
DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT);
   }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java
index 42706f382a5..f896b7a6775 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java
@@ -19,11 +19,10 @@ package org.apache.hive.service.cli;
 import java.util.List;
 import java.util.Map;
 
-
-
-
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 public interface ICLIService {
 
@@ -86,13 +85,13 @@ public interface ICLIService {
   void closeOperation(OperationHandle opHandle)
       throws HiveSQLException;
 
-  TableSchema getResultSetMetadata(OperationHandle opHandle)
+  TTableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException;
 
-  RowSet fetchResults(OperationHandle opHandle)
+  TRowSet fetchResults(OperationHandle opHandle)
       throws HiveSQLException;
 
-  RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+  TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException;
 
   String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory 
authFactory,
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index 96617dde131..815a369b6b2 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -16,13 +16,9 @@
  */
 package org.apache.hive.service.cli.operation;
 
-import java.sql.SQLException;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
 import org.apache.hadoop.hive.ql.session.OperationLog;
-import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationType;
 import org.apache.hive.service.cli.session.HiveSession;
 
@@ -39,23 +35,6 @@ public abstract class ExecuteStatementOperation extends 
Operation {
     return statement;
   }
 
-  public static ExecuteStatementOperation 
newExecuteStatementOperation(HiveSession parentSession,
-     String statement, Map<String, String> confOverlay, boolean runAsync, long 
queryTimeout)
-      throws HiveSQLException {
-    String[] tokens = statement.trim().split("\\s+");
-    CommandProcessor processor = null;
-    try {
-      processor = CommandProcessorFactory.getForHiveCommand(tokens, 
parentSession.getHiveConf());
-    } catch (SQLException e) {
-      throw new HiveSQLException(e.getMessage(), e.getSQLState(), e);
-    }
-    if (processor == null) {
-      // runAsync, queryTimeout makes sense only for a SQLOperation
-      return new SQLOperation(parentSession, statement, confOverlay, runAsync, 
queryTimeout);
-    }
-    return new HiveCommandOperation(parentSession, statement, processor, 
confOverlay);
-  }
-
   protected void registerCurrentOperationLog() {
     if (isOperationLogEnabled) {
       if (operationLog == null) {
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
index 6f2deddecdc..ef4bbb45e8f 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java
@@ -18,14 +18,10 @@
 package org.apache.hive.service.cli.operation;
 
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.OperationType;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetCatalogsOperation.
@@ -61,20 +57,20 @@ public class GetCatalogsOperation extends MetadataOperation 
{
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
-    return RESULT_SET_SCHEMA;
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java
index d48ca6174be..250adc51f81 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java
@@ -48,6 +48,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetColumnsOperation.
@@ -229,22 +231,22 @@ public class GetColumnsOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java
index 9b149f0a5bf..3a29859a207 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.serde2.thrift.Type;
 import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 import java.util.List;
 
@@ -140,21 +142,21 @@ public class GetCrossReferenceOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java
index 9cb9ded930a..3f02f753bf8 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java
@@ -37,6 +37,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.apache.thrift.TException;
 
 /**
@@ -126,21 +128,21 @@ public class GetFunctionsOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java
index 29842ad30ac..92732834297 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.serde2.thrift.Type;
 import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 import java.util.List;
 
@@ -94,21 +96,21 @@ public class GetPrimaryKeysOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java
index fb74d02773b..865e264bd5f 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java
@@ -27,6 +27,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetSchemasOperation.
@@ -75,21 +77,21 @@ public class GetSchemasOperation extends MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
index 115e6651f73..b75eaec5ff6 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java
@@ -28,6 +28,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetTableTypesOperation.
@@ -71,22 +73,22 @@ public class GetTableTypesOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
index 857f00a5e92..bd9f0814814 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
@@ -35,6 +35,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetTablesOperation.
@@ -122,21 +124,21 @@ public class GetTablesOperation extends MetadataOperation 
{
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
index b480e548e78..ad692d46edd 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java
@@ -27,6 +27,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 /**
  * GetTypeInfoOperation.
@@ -121,21 +123,21 @@ public class GetTypeInfoOperation extends 
MetadataOperation {
    * @see org.apache.hive.service.cli.Operation#getResultSetSchema()
    */
   @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
+  public TTableSchema getResultSetSchema() throws HiveSQLException {
     assertState(OperationState.FINISHED);
-    return RESULT_SET_SCHEMA;
+    return RESULT_SET_SCHEMA.toTTableSchema();
   }
 
   /* (non-Javadoc)
    * @see 
org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
    */
   @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
+  public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
     assertState(OperationState.FINISHED);
     validateDefaultFetchOrientation(orientation);
     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
       rowSet.setStartOffset(0);
     }
-    return rowSet.extractSubset((int)maxRows);
+    return rowSet.extractSubset((int)maxRows).toTRowSet();
   }
 }
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
deleted file mode 100644
index 173256d4782..00000000000
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.service.cli.operation;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.service.ServiceUtils;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.cli.session.HiveSession;
-
-/**
- * Executes a HiveCommand
- */
-public class HiveCommandOperation extends ExecuteStatementOperation {
-  private CommandProcessor commandProcessor;
-  private TableSchema resultSchema = null;
-
-  /**
-   * For processors other than Hive queries (Driver), they output to 
session.out (a temp file)
-   * first and the fetchOne/fetchN/fetchAll functions get the output from 
pipeIn.
-   */
-  private BufferedReader resultReader;
-
-
-  protected HiveCommandOperation(HiveSession parentSession, String statement,
-      CommandProcessor commandProcessor, Map<String, String> confOverlay) {
-    super(parentSession, statement, confOverlay, false);
-    this.commandProcessor = commandProcessor;
-    setupSessionIO(parentSession.getSessionState());
-  }
-
-  private void setupSessionIO(SessionState sessionState) {
-    try {
-      LOG.info("Putting temp output to file " + 
sessionState.getTmpOutputFile().toString());
-      sessionState.in = null; // hive server's session input stream is not used
-      // open a per-session file in auto-flush mode for writing temp results
-      sessionState.out = new PrintStream(new 
FileOutputStream(sessionState.getTmpOutputFile()), true, UTF_8.name());
-      // TODO: for hadoop jobs, progress is printed out to session.err,
-      // we should find a way to feed back job progress to client
-      sessionState.err = new PrintStream(System.err, true, UTF_8.name());
-    } catch (IOException e) {
-      LOG.error("Error in creating temp output file ", e);
-      try {
-        sessionState.in = null;
-        sessionState.out = new PrintStream(System.out, true, UTF_8.name());
-        sessionState.err = new PrintStream(System.err, true, UTF_8.name());
-      } catch (UnsupportedEncodingException ee) {
-        LOG.error("Error creating PrintStream", e);
-        ee.printStackTrace();
-        sessionState.out = null;
-        sessionState.err = null;
-      }
-    }
-  }
-
-
-  private void tearDownSessionIO() {
-    ServiceUtils.cleanup(LOG,
-        parentSession.getSessionState().out, 
parentSession.getSessionState().err);
-  }
-
-  @Override
-  public void runInternal() throws HiveSQLException {
-    setState(OperationState.RUNNING);
-    try {
-      String command = getStatement().trim();
-      String[] tokens = statement.split("\\s");
-      String commandArgs = command.substring(tokens[0].length()).trim();
-
-      CommandProcessorResponse response = commandProcessor.run(commandArgs);
-      int returnCode = response.getResponseCode();
-      if (returnCode != 0) {
-        throw toSQLException("Error while processing statement", response);
-      }
-      Schema schema = response.getSchema();
-      if (schema != null) {
-        setHasResultSet(true);
-        resultSchema = new TableSchema(schema);
-      } else {
-        setHasResultSet(false);
-        resultSchema = new TableSchema();
-      }
-    } catch (HiveSQLException e) {
-      setState(OperationState.ERROR);
-      throw e;
-    } catch (Exception e) {
-      setState(OperationState.ERROR);
-      throw new HiveSQLException("Error running query: " + e.toString(), e);
-    }
-    setState(OperationState.FINISHED);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hive.service.cli.operation.Operation#close()
-   */
-  @Override
-  public void close() throws HiveSQLException {
-    setState(OperationState.CLOSED);
-    tearDownSessionIO();
-    cleanTmpFile();
-    cleanupOperationLog();
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hive.service.cli.operation.Operation#getResultSetSchema()
-   */
-  @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
-    return resultSchema;
-  }
-
-  /* (non-Javadoc)
-   * @see 
org.apache.hive.service.cli.operation.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation,
 long)
-   */
-  @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
-    validateDefaultFetchOrientation(orientation);
-    if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
-      resetResultReader();
-    }
-    List<String> rows = readResults((int) maxRows);
-    RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), 
false);
-
-    for (String row : rows) {
-      rowSet.addRow(new String[] {row});
-    }
-    return rowSet;
-  }
-
-  /**
-   * Reads the temporary results for non-Hive (non-Driver) commands to the
-   * resulting List of strings.
-   * @param nLines number of lines read at once. If it is <= 0, then read all 
lines.
-   */
-  private List<String> readResults(int nLines) throws HiveSQLException {
-    if (resultReader == null) {
-      SessionState sessionState = getParentSession().getSessionState();
-      File tmp = sessionState.getTmpOutputFile();
-      try {
-        resultReader = new BufferedReader(new FileReader(tmp));
-      } catch (FileNotFoundException e) {
-        LOG.error("File " + tmp + " not found. ", e);
-        throw new HiveSQLException(e);
-      }
-    }
-    List<String> results = new ArrayList<String>();
-
-    for (int i = 0; i < nLines || nLines <= 0; ++i) {
-      try {
-        String line = resultReader.readLine();
-        if (line == null) {
-          // reached the end of the result file
-          break;
-        } else {
-          results.add(line);
-        }
-      } catch (IOException e) {
-        LOG.error("Reading temp results encountered an exception: ", e);
-        throw new HiveSQLException(e);
-      }
-    }
-    return results;
-  }
-
-  private void cleanTmpFile() {
-    resetResultReader();
-    SessionState sessionState = getParentSession().getSessionState();
-    sessionState.deleteTmpOutputFile();
-    sessionState.deleteTmpErrOutputFile();
-  }
-
-  private void resetResultReader() {
-    if (resultReader != null) {
-      ServiceUtils.cleanup(LOG, resultReader);
-      resultReader = null;
-    }
-  }
-}
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index 9651701f9c3..ad42925207d 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -34,10 +34,10 @@ import org.apache.hive.service.cli.OperationHandle;
 import org.apache.hive.service.cli.OperationState;
 import org.apache.hive.service.cli.OperationStatus;
 import org.apache.hive.service.cli.OperationType;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -302,13 +302,9 @@ public abstract class Operation {
     cleanupOperationLog();
   }
 
-  public abstract TableSchema getResultSetSchema() throws HiveSQLException;
+  public abstract TTableSchema getResultSetSchema() throws HiveSQLException;
 
-  public abstract RowSet getNextRowSet(FetchOrientation orientation, long 
maxRows) throws HiveSQLException;
-
-  public RowSet getNextRowSet() throws HiveSQLException {
-    return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS);
-  }
+  public abstract TRowSet getNextRowSet(FetchOrientation orientation, long 
maxRows) throws HiveSQLException;
 
   /**
    * Verify if the given fetch orientation is part of the default orientation 
types.
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
index 40daa1ff493..5aede567e95 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -37,6 +37,8 @@ import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.apache.logging.log4j.core.appender.AbstractWriterAppender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,20 +87,10 @@ public class OperationManager extends AbstractService {
     ap.start();
   }
 
-  public ExecuteStatementOperation newExecuteStatementOperation(HiveSession 
parentSession,
-      String statement, Map<String, String> confOverlay, boolean runAsync)
-          throws HiveSQLException {
-    ExecuteStatementOperation executeStatementOperation = 
ExecuteStatementOperation
-        .newExecuteStatementOperation(parentSession, statement, confOverlay, 
runAsync, 0);
-    addOperation(executeStatementOperation);
-    return executeStatementOperation;
-  }
-
   public ExecuteStatementOperation newExecuteStatementOperation(HiveSession 
parentSession,
       String statement, Map<String, String> confOverlay, boolean runAsync, 
long queryTimeout)
           throws HiveSQLException {
-    return newExecuteStatementOperation(parentSession, statement, confOverlay, 
runAsync,
-        queryTimeout);
+      throw new UnsupportedOperationException();
   }
 
   public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession 
parentSession) {
@@ -230,23 +222,18 @@ public class OperationManager extends AbstractService {
     operation.close();
   }
 
-  public TableSchema getOperationResultSetSchema(OperationHandle opHandle)
+  public TTableSchema getOperationResultSetSchema(OperationHandle opHandle)
       throws HiveSQLException {
     return getOperation(opHandle).getResultSetSchema();
   }
 
-  public RowSet getOperationNextRowSet(OperationHandle opHandle)
-      throws HiveSQLException {
-    return getOperation(opHandle).getNextRowSet();
-  }
-
-  public RowSet getOperationNextRowSet(OperationHandle opHandle,
+  public TRowSet getOperationNextRowSet(OperationHandle opHandle,
       FetchOrientation orientation, long maxRows)
           throws HiveSQLException {
     return getOperation(opHandle).getNextRowSet(orientation, maxRows);
   }
 
-  public RowSet getOperationLogRowSet(OperationHandle opHandle,
+  public TRowSet getOperationLogRowSet(OperationHandle opHandle,
       FetchOrientation orientation, long maxRows)
           throws HiveSQLException {
     // get the OperationLog object from the operation
@@ -272,7 +259,7 @@ public class OperationManager extends AbstractService {
       rowSet.addRow(new String[] {log});
     }
 
-    return rowSet;
+    return rowSet.toTRowSet();
   }
 
   private boolean isFetchFirst(FetchOrientation fetchOrientation) {
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java
deleted file mode 100644
index ce704b281df..00000000000
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.service.cli.operation;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.ExplainTask;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.TableSchema;
-import org.apache.hive.service.cli.session.HiveSession;
-import org.apache.hive.service.server.ThreadWithGarbageCleanup;
-
-/**
- * SQLOperation.
- *
- */
-public class SQLOperation extends ExecuteStatementOperation {
-
-  private Driver driver = null;
-  private CommandProcessorResponse response;
-  private TableSchema resultSchema = null;
-  private Schema mResultSchema = null;
-  private AbstractSerDe serde = null;
-  private boolean fetchStarted = false;
-
-  public SQLOperation(HiveSession parentSession, String statement, Map<String, 
String> confOverlay,
-      boolean runInBackground, long queryTimeout) {
-    // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
-    super(parentSession, statement, confOverlay, runInBackground);
-  }
-
-  /***
-   * Compile the query and extract metadata
-   * @param queryState
-   * @throws HiveSQLException
-   */
-  public void prepare(QueryState queryState) throws HiveSQLException {
-    setState(OperationState.RUNNING);
-
-    try {
-      driver = new Driver(queryState, getParentSession().getUserName());
-
-      // set the operation handle information in Driver, so that thrift API 
users
-      // can use the operation handle they receive, to lookup query 
information in
-      // Yarn ATS
-      String guid64 = 
Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier()
-          .toTHandleIdentifier().getGuid()).trim();
-      driver.setOperationId(guid64);
-
-      // In Hive server mode, we are not able to retry in the FetchTask
-      // case, when calling fetch queries since execute() has returned.
-      // For now, we disable the test attempts.
-      driver.setTryCount(Integer.MAX_VALUE);
-
-      response = driver.compileAndRespond(statement);
-      if (0 != response.getResponseCode()) {
-        throw toSQLException("Error while compiling statement", response);
-      }
-
-      mResultSchema = driver.getSchema();
-
-      // hasResultSet should be true only if the query has a FetchTask
-      // "explain" is an exception for now
-      if(driver.getPlan().getFetchTask() != null) {
-        //Schema has to be set
-        if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) {
-          throw new HiveSQLException("Error compiling query: Schema and 
FieldSchema " +
-              "should be set when query plan has a FetchTask");
-        }
-        resultSchema = new TableSchema(mResultSchema);
-        setHasResultSet(true);
-      } else {
-        setHasResultSet(false);
-      }
-      // Set hasResultSet true if the plan has ExplainTask
-      // TODO explain should use a FetchTask for reading
-      for (Task<? extends Serializable> task: driver.getPlan().getRootTasks()) 
{
-        if (task.getClass() == ExplainTask.class) {
-          resultSchema = new TableSchema(mResultSchema);
-          setHasResultSet(true);
-          break;
-        }
-      }
-    } catch (HiveSQLException e) {
-      setState(OperationState.ERROR);
-      throw e;
-    } catch (Exception e) {
-      setState(OperationState.ERROR);
-      throw new HiveSQLException("Error running query: " + e.toString(), e);
-    }
-  }
-
-  private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException {
-    try {
-      // In Hive server mode, we are not able to retry in the FetchTask
-      // case, when calling fetch queries since execute() has returned.
-      // For now, we disable the test attempts.
-      driver.setTryCount(Integer.MAX_VALUE);
-      response = driver.run();
-      if (0 != response.getResponseCode()) {
-        throw toSQLException("Error while processing statement", response);
-      }
-    } catch (HiveSQLException e) {
-      // If the operation was cancelled by another thread or timed out,
-      // Driver#run will return a non-zero response code.
-      // We will simply return if the operation state is CANCELED or TIMEDOUT,
-      // otherwise throw an exception
-      if (getStatus().getState() == OperationState.CANCELED ||
-          getStatus().getState() == OperationState.TIMEDOUT) {
-        return;
-      }
-      else {
-        setState(OperationState.ERROR);
-        throw e;
-      }
-    } catch (Exception e) {
-      setState(OperationState.ERROR);
-      throw new HiveSQLException("Error running query: " + e.toString(), e);
-    }
-    setState(OperationState.FINISHED);
-  }
-
-  @Override
-  public void runInternal() throws HiveSQLException {
-    setState(OperationState.PENDING);
-    final HiveConf opConfig = getConfigForOperation();
-    prepare(queryState);
-    if (!shouldRunAsync()) {
-      runQuery(opConfig);
-    } else {
-      // We'll pass ThreadLocals in the background thread from the foreground 
(handler) thread
-      final SessionState parentSessionState = SessionState.get();
-      // ThreadLocal Hive object needs to be set in background thread.
-      // The metastore client in Hive is associated with right user.
-      final Hive parentHive = getSessionHive();
-      // Current UGI will get used by metastore when metsatore is in embedded 
mode
-      // So this needs to get passed to the new background thread
-      final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
-      // Runnable impl to call runInternal asynchronously,
-      // from a different thread
-      Runnable backgroundOperation = () -> {
-        PrivilegedExceptionAction<Object> doAsAction = () -> {
-          Hive.set(parentHive);
-          SessionState.setCurrentSessionState(parentSessionState);
-          // Set current OperationLog in this async thread for keeping on 
saving query log.
-          registerCurrentOperationLog();
-          try {
-            runQuery(opConfig);
-          } catch (HiveSQLException e) {
-            setOperationException(e);
-            LOG.error("Error running hive query: ", e);
-          } finally {
-            unregisterOperationLog();
-          }
-          return null;
-        };
-
-        try {
-          currentUGI.doAs(doAsAction);
-        } catch (Exception e) {
-          setOperationException(new HiveSQLException(e));
-          LOG.error("Error running hive query as user : " + 
currentUGI.getShortUserName(), e);
-        }
-        finally {
-          /**
-           * We'll cache the ThreadLocal RawStore object for this background 
thread for an orderly cleanup
-           * when this thread is garbage collected later.
-           * @see ThreadWithGarbageCleanup#finalize()
-           */
-          if (ThreadWithGarbageCleanup.currentThread() instanceof 
ThreadWithGarbageCleanup) {
-            ThreadWithGarbageCleanup currentThread =
-                (ThreadWithGarbageCleanup) 
ThreadWithGarbageCleanup.currentThread();
-            currentThread.cacheThreadLocalRawStore();
-          }
-        }
-      };
-      try {
-        // This submit blocks if no background threads are available to run 
this operation
-        Future<?> backgroundHandle =
-            
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
-        setBackgroundHandle(backgroundHandle);
-      } catch (RejectedExecutionException rejected) {
-        setState(OperationState.ERROR);
-        throw new HiveSQLException("The background threadpool cannot accept" +
-            " new task for execution, please retry the operation", rejected);
-      }
-    }
-  }
-
-  /**
-   * Returns the current UGI on the stack
-   * @param opConfig
-   * @return UserGroupInformation
-   * @throws HiveSQLException
-   */
-  private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws 
HiveSQLException {
-    try {
-      return Utils.getUGI();
-    } catch (Exception e) {
-      throw new HiveSQLException("Unable to get current user", e);
-    }
-  }
-
-  /**
-   * Returns the ThreadLocal Hive for the current thread
-   * @return Hive
-   * @throws HiveSQLException
-   */
-  private Hive getSessionHive() throws HiveSQLException {
-    try {
-      return Hive.get();
-    } catch (HiveException e) {
-      throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
-    }
-  }
-
-  private void cleanup(OperationState state) throws HiveSQLException {
-    setState(state);
-    if (shouldRunAsync()) {
-      Future<?> backgroundHandle = getBackgroundHandle();
-      if (backgroundHandle != null) {
-        backgroundHandle.cancel(true);
-      }
-    }
-    if (driver != null) {
-      driver.close();
-      driver.destroy();
-    }
-    driver = null;
-
-    SessionState ss = SessionState.get();
-    if (ss.getTmpOutputFile() != null) {
-      ss.getTmpOutputFile().delete();
-    }
-  }
-
-  @Override
-  public void cancel() throws HiveSQLException {
-    cleanup(OperationState.CANCELED);
-  }
-
-  @Override
-  public void close() throws HiveSQLException {
-    cleanup(OperationState.CLOSED);
-    cleanupOperationLog();
-  }
-
-  @Override
-  public TableSchema getResultSetSchema() throws HiveSQLException {
-    assertState(OperationState.FINISHED);
-    if (resultSchema == null) {
-      resultSchema = new TableSchema(driver.getSchema());
-    }
-    return resultSchema;
-  }
-
-  private final transient List<Object> convey = new ArrayList<Object>();
-
-  @Override
-  public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) 
throws HiveSQLException {
-    validateDefaultFetchOrientation(orientation);
-    assertState(OperationState.FINISHED);
-
-    RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), 
false);
-
-    try {
-      /* if client is requesting fetch-from-start and its not the first time 
reading from this operation
-       * then reset the fetch position to beginning
-       */
-      if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) {
-        driver.resetFetch();
-      }
-      fetchStarted = true;
-      driver.setMaxRows((int) maxRows);
-      if (driver.getResults(convey)) {
-        return decode(convey, rowSet);
-      }
-      return rowSet;
-    } catch (IOException e) {
-      throw new HiveSQLException(e);
-    } catch (CommandNeedRetryException e) {
-      throw new HiveSQLException(e);
-    } catch (Exception e) {
-      throw new HiveSQLException(e);
-    } finally {
-      convey.clear();
-    }
-  }
-
-  private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception {
-    if (driver.isFetchingTable()) {
-      return prepareFromRow(rows, rowSet);
-    }
-    return decodeFromString(rows, rowSet);
-  }
-
-  // already encoded to thrift-able object in ThriftFormatter
-  private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws 
Exception {
-    for (Object row : rows) {
-      rowSet.addRow((Object[]) row);
-    }
-    return rowSet;
-  }
-
-  private RowSet decodeFromString(List<Object> rows, RowSet rowSet)
-      throws SQLException, SerDeException {
-    getSerDe();
-    StructObjectInspector soi = (StructObjectInspector) 
serde.getObjectInspector();
-    List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
-
-    Object[] deserializedFields = new Object[fieldRefs.size()];
-    Object rowObj;
-    ObjectInspector fieldOI;
-
-    int protocol = getProtocolVersion().getValue();
-    for (Object rowString : rows) {
-      rowObj = serde.deserialize(new 
BytesWritable(((String)rowString).getBytes(UTF_8)));
-      for (int i = 0; i < fieldRefs.size(); i++) {
-        StructField fieldRef = fieldRefs.get(i);
-        fieldOI = fieldRef.getFieldObjectInspector();
-        Object fieldData = soi.getStructFieldData(rowObj, fieldRef);
-        deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, 
protocol);
-      }
-      rowSet.addRow(deserializedFields);
-    }
-    return rowSet;
-  }
-
-  private AbstractSerDe getSerDe() throws SQLException {
-    if (serde != null) {
-      return serde;
-    }
-    try {
-      List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas();
-      StringBuilder namesSb = new StringBuilder();
-      StringBuilder typesSb = new StringBuilder();
-
-      if (fieldSchemas != null && !fieldSchemas.isEmpty()) {
-        for (int pos = 0; pos < fieldSchemas.size(); pos++) {
-          if (pos != 0) {
-            namesSb.append(",");
-            typesSb.append(",");
-          }
-          namesSb.append(fieldSchemas.get(pos).getName());
-          typesSb.append(fieldSchemas.get(pos).getType());
-        }
-      }
-      String names = namesSb.toString();
-      String types = typesSb.toString();
-
-      serde = new LazySimpleSerDe();
-      Properties props = new Properties();
-      if (names.length() > 0) {
-        LOG.debug("Column names: " + names);
-        props.setProperty(serdeConstants.LIST_COLUMNS, names);
-      }
-      if (types.length() > 0) {
-        LOG.debug("Column types: " + types);
-        props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
-      }
-      SerDeUtils.initializeSerDe(serde, new HiveConf(), props, null);
-
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      throw new SQLException("Could not create ResultSet: " + ex.getMessage(), 
ex);
-    }
-    return serde;
-  }
-
-  /**
-   * If there are query specific settings to overlay, then create a copy of 
config
-   * There are two cases we need to clone the session config that's being 
passed to hive driver
-   * 1. Async query -
-   *    If the client changes a config setting, that shouldn't reflect in the 
execution already underway
-   * 2. confOverlay -
-   *    The query specific settings should only be applied to the query config 
and not session
-   * @return new configuration
-   * @throws HiveSQLException
-   */
-  private HiveConf getConfigForOperation() throws HiveSQLException {
-    HiveConf sqlOperationConf = getParentSession().getHiveConf();
-    if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
-      // clone the parent session config for this query
-      sqlOperationConf = new HiveConf(sqlOperationConf);
-
-      // apply overlay query specific settings, if any
-      for (Map.Entry<String, String> confEntry : getConfOverlay().entrySet()) {
-        try {
-          sqlOperationConf.verifyAndSet(confEntry.getKey(), 
confEntry.getValue());
-        } catch (IllegalArgumentException e) {
-          throw new HiveSQLException("Error applying statement specific 
settings", e);
-        }
-      }
-    }
-    return sqlOperationConf;
-  }
-}
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java
index ca0540e5aac..b1bb2096822 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.*;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 
 public interface HiveSession extends HiveSessionBase {
 
@@ -182,10 +184,10 @@ public interface HiveSession extends HiveSessionBase {
 
   void closeOperation(OperationHandle opHandle) throws HiveSQLException;
 
-  TableSchema getResultSetMetadata(OperationHandle opHandle)
+  TTableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException;
 
-  RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+  TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException;
 
   String getDelegationToken(HiveAuthFactory authFactory, String owner,
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 8e1e500ff78..4ac3c1e5887 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -52,9 +52,7 @@ import org.apache.hive.service.cli.GetInfoType;
 import org.apache.hive.service.cli.GetInfoValue;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
 import org.apache.hive.service.cli.operation.GetCatalogsOperation;
 import org.apache.hive.service.cli.operation.GetColumnsOperation;
@@ -68,6 +66,8 @@ import 
org.apache.hive.service.cli.operation.MetadataOperation;
 import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRowSet;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -789,7 +789,7 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
-  public TableSchema getResultSetMetadata(OperationHandle opHandle) throws 
HiveSQLException {
+  public TTableSchema getResultSetMetadata(OperationHandle opHandle) throws 
HiveSQLException {
     acquire(true);
     try {
       return 
sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
@@ -799,7 +799,7 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation,
+  public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException {
     acquire(true);
     try {
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 4a223c8666a..3517df908b2 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -624,8 +624,8 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
       throws TException {
     TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp();
     try {
-      TableSchema schema = cliService.getResultSetMetadata(new 
OperationHandle(req.getOperationHandle()));
-      resp.setSchema(schema.toTTableSchema());
+      TTableSchema schema = cliService.getResultSetMetadata(new 
OperationHandle(req.getOperationHandle()));
+      resp.setSchema(schema);
       resp.setStatus(OK_STATUS);
     } catch (Exception e) {
       LOG.warn("Error getting result set metadata: ", e);
@@ -638,12 +638,12 @@ public abstract class ThriftCLIService extends 
AbstractService implements TCLISe
   public TFetchResultsResp FetchResults(TFetchResultsReq req) throws 
TException {
     TFetchResultsResp resp = new TFetchResultsResp();
     try {
-      RowSet rowSet = cliService.fetchResults(
+      TRowSet rowSet = cliService.fetchResults(
           new OperationHandle(req.getOperationHandle()),
           FetchOrientation.getFetchOrientation(req.getOrientation()),
           req.getMaxRows(),
           FetchType.getFetchType(req.getFetchType()));
-      resp.setResults(rowSet.toTRowSet());
+      resp.setResults(rowSet);
       resp.setHasMoreRows(false);
       resp.setStatus(OK_STATUS);
     } catch (Exception e) {
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
index 6fc2d3a5fa9..4cfb48e0640 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
@@ -362,13 +362,13 @@ public class ThriftCLIServiceClient extends 
CLIServiceClient {
    * @see 
org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle)
    */
   @Override
-  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+  public TTableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException {
     try {
       TGetResultSetMetadataReq req = new 
TGetResultSetMetadataReq(opHandle.toTOperationHandle());
       TGetResultSetMetadataResp resp = cliService.GetResultSetMetadata(req);
       checkStatus(resp.getStatus());
-      return new TableSchema(resp.getSchema());
+      return resp.getSchema();
     } catch (HiveSQLException e) {
       throw e;
     } catch (Exception e) {
@@ -377,7 +377,7 @@ public class ThriftCLIServiceClient extends 
CLIServiceClient {
   }
 
   @Override
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation, long maxRows,
+  public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation 
orientation, long maxRows,
       FetchType fetchType) throws HiveSQLException {
     try {
       TFetchResultsReq req = new TFetchResultsReq();
@@ -387,7 +387,7 @@ public class ThriftCLIServiceClient extends 
CLIServiceClient {
       req.setFetchType(fetchType.toTFetchType());
       TFetchResultsResp resp = cliService.FetchResults(req);
       checkStatus(resp.getStatus());
-      return RowSetFactory.create(resp.getResults(), 
opHandle.getProtocolVersion());
+      return resp.getResults();
     } catch (HiveSQLException e) {
       throw e;
     } catch (Exception e) {
@@ -399,7 +399,7 @@ public class ThriftCLIServiceClient extends 
CLIServiceClient {
    * @see 
org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle)
    */
   @Override
-  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException 
{
+  public TRowSet fetchResults(OperationHandle opHandle) throws 
HiveSQLException {
     // TODO: set the correct default fetch size
     return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, 
FetchType.QUERY_OUTPUT);
   }
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala
new file mode 100644
index 00000000000..9625021f392
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.HiveResult.{toHiveString, TimeFormatters}
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType}
+
+object RowSetUtils {
+
+  implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
+    ByteBuffer.wrap(bitSet.toByteArray)
+  }
+
+  def toTRowSet(
+      startRowOffSet: Long,
+      rows: Seq[Row],
+      schema: Array[DataType],
+      protocolVersion: TProtocolVersion,
+      timeFormatters: TimeFormatters): TRowSet = {
+    if (protocolVersion.getValue < 
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+      toRowBasedSet(startRowOffSet, rows, schema, timeFormatters)
+    } else {
+      toColumnBasedSet(startRowOffSet, rows, schema, timeFormatters)
+    }
+  }
+
+  private def toRowBasedSet(
+      startRowOffSet: Long,
+      rows: Seq[Row],
+      schema: Array[DataType],
+      timeFormatters: TimeFormatters): TRowSet = {
+    var i = 0
+    val rowSize = rows.length
+    val tRows = new java.util.ArrayList[TRow](rowSize)
+    while (i < rowSize) {
+      val row = rows(i)
+      val tRow = new TRow()
+      var j = 0
+      val columnSize = row.length
+      while (j < columnSize) {
+        val columnValue = toTColumnValue(j, row, schema(j), timeFormatters)
+        tRow.addToColVals(columnValue)
+        j += 1
+      }
+      i += 1
+      tRows.add(tRow)
+    }
+    new TRowSet(startRowOffSet, tRows)
+  }
+
+  private def toColumnBasedSet(
+      startRowOffSet: Long,
+      rows: Seq[Row],
+      schema: Array[DataType],
+      timeFormatters: TimeFormatters): TRowSet = {
+    val rowSize = rows.length
+    val tRowSet = new TRowSet(startRowOffSet, new 
java.util.ArrayList[TRow](rowSize))
+    var i = 0
+    val columnSize = schema.length
+    while (i < columnSize) {
+      val tColumn = toTColumn(rows, i, schema(i), timeFormatters)
+      tRowSet.addToColumns(tColumn)
+      i += 1
+    }
+    tRowSet
+  }
+
+  private def toTColumn(
+      rows: Seq[Row], ordinal: Int, typ: DataType, timeFormatters: 
TimeFormatters): TColumn = {
+    val nulls = new java.util.BitSet()
+    typ match {
+      case BooleanType =>
+        val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, 
true)
+        TColumn.boolVal(new TBoolColumn(values, nulls))
+
+      case ByteType =>
+        val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 
0.toByte)
+        TColumn.byteVal(new TByteColumn(values, nulls))
+
+      case ShortType =>
+        val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 
0.toShort)
+        TColumn.i16Val(new TI16Column(values, nulls))
+
+      case IntegerType =>
+        val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
+        TColumn.i32Val(new TI32Column(values, nulls))
+
+      case LongType =>
+        val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
+        TColumn.i64Val(new TI64Column(values, nulls))
+
+      case FloatType =>
+        val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 
0.toFloat)
+          .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
+        TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+      case DoubleType =>
+        val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 
0.toDouble)
+        TColumn.doubleVal(new TDoubleColumn(values, nulls))
+
+      case StringType =>
+        val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
+        TColumn.stringVal(new TStringColumn(values, nulls))
+
+      case BinaryType =>
+        val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, 
Array.empty[Byte])
+          .asScala
+          .map(ByteBuffer.wrap)
+          .asJava
+        TColumn.binaryVal(new TBinaryColumn(values, nulls))
+
+      case _ =>
+        var i = 0
+        val rowSize = rows.length
+        val values = new java.util.ArrayList[String](rowSize)
+        while (i < rowSize) {
+          val row = rows(i)
+          nulls.set(i, row.isNullAt(ordinal))
+          val value = if (row.isNullAt(ordinal)) {
+            ""
+          } else {
+            toHiveString((row.get(ordinal), typ), nested = true, 
timeFormatters)
+          }
+          values.add(value)
+          i += 1
+        }
+        TColumn.stringVal(new TStringColumn(values, nulls))
+    }
+  }
+
+  private def getOrSetAsNull[T](
+      rows: Seq[Row],
+      ordinal: Int,
+      nulls: java.util.BitSet,
+      defaultVal: T): java.util.List[T] = {
+    val size = rows.length
+    val ret = new java.util.ArrayList[T](size)
+    var idx = 0
+    while (idx < size) {
+      val row = rows(idx)
+      if (row.isNullAt(ordinal)) {
+        nulls.set(idx, true)
+        ret.add(idx, defaultVal)
+      } else {
+        ret.add(idx, row.getAs[T](ordinal))
+      }
+      idx += 1
+    }
+    ret
+  }
+
+  private def toTColumnValue(
+      ordinal: Int,
+      row: Row,
+      dataType: DataType,
+      timeFormatters: TimeFormatters): TColumnValue = {
+    dataType match {
+      case BooleanType =>
+        val boolValue = new TBoolValue
+        if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal))
+        TColumnValue.boolVal(boolValue)
+
+      case ByteType =>
+        val byteValue = new TByteValue
+        if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal))
+        TColumnValue.byteVal(byteValue)
+
+      case ShortType =>
+        val tI16Value = new TI16Value
+        if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal))
+        TColumnValue.i16Val(tI16Value)
+
+      case IntegerType =>
+        val tI32Value = new TI32Value
+        if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal))
+        TColumnValue.i32Val(tI32Value)
+
+      case LongType =>
+        val tI64Value = new TI64Value
+        if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal))
+        TColumnValue.i64Val(tI64Value)
+
+      case FloatType =>
+        val tDoubleValue = new TDoubleValue
+        if (!row.isNullAt(ordinal)) {
+          // Floats are converted to doubles during thrift transportation.
+          // Passing float to Double.valueOf causes precision loss, e.g.
+          // scala> java.lang.Double.valueOf(0.1f)
+          // res0: Double = 0.10000000149011612
+          //
+          // hereby toString is called ahead.
+          // scala> java.lang.Double.valueOf(0.1f.toString)
+          // res1: Double = 0.1
+          val doubleValue = 
java.lang.Double.valueOf(row.getFloat(ordinal).toString)
+          tDoubleValue.setValue(doubleValue)
+        }
+        TColumnValue.doubleVal(tDoubleValue)
+
+      case DoubleType =>
+        val tDoubleValue = new TDoubleValue
+        if (!row.isNullAt(ordinal)) 
tDoubleValue.setValue(row.getDouble(ordinal))
+        TColumnValue.doubleVal(tDoubleValue)
+
+      case StringType =>
+        val tStringValue = new TStringValue
+        if (!row.isNullAt(ordinal)) 
tStringValue.setValue(row.getString(ordinal))
+        TColumnValue.stringVal(tStringValue)
+
+      case _ =>
+        val tStrValue = new TStringValue
+        if (!row.isNullAt(ordinal)) {
+          val value = toHiveString((row.get(ordinal), dataType), nested = 
false, timeFormatters)
+          tStrValue.setValue(value)
+        }
+        TColumnValue.stringVal(tStrValue)
+    }
+  }
+}
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 4f4088990a9..2c77e00c46c 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -18,26 +18,23 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.security.PrivilegedExceptionAction
-import java.util.{Arrays, Map => JMap}
+import java.util.{Collections, Map => JMap}
 import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.shims.Utils
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
+import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, 
TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, 
TTypeQualifiers, TTypeQualifierValue}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
-import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, 
toHiveString, TimeFormatters}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.VariableSubstitution
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.execution.HiveResult.getTimeFormatters
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.{Utils => SparkUtils}
 
 private[hive] class SparkExecuteStatementOperation(
@@ -71,62 +68,20 @@ private[hive] class SparkExecuteStatementOperation(
 
   private var result: DataFrame = _
 
-  private var iter: FetchIterator[SparkRow] = _
+  private var iter: FetchIterator[Row] = _
   private var dataTypes: Array[DataType] = _
 
-  private lazy val resultSchema: TableSchema = {
+  private lazy val resultSchema: TTableSchema = {
     if (result == null || result.schema.isEmpty) {
-      new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
+      val sparkType = new StructType().add("Result", "string")
+      SparkExecuteStatementOperation.toTTableSchema(sparkType)
     } else {
       logInfo(s"Result Schema: ${result.schema}")
-      SparkExecuteStatementOperation.getTableSchema(result.schema)
+      SparkExecuteStatementOperation.toTTableSchema(result.schema)
     }
   }
 
-  def addNonNullColumnValue(
-      from: SparkRow,
-      to: ArrayBuffer[Any],
-      ordinal: Int,
-      timeFormatters: TimeFormatters): Unit = {
-    dataTypes(ordinal) match {
-      case StringType =>
-        to += from.getString(ordinal)
-      case IntegerType =>
-        to += from.getInt(ordinal)
-      case BooleanType =>
-        to += from.getBoolean(ordinal)
-      case DoubleType =>
-        to += from.getDouble(ordinal)
-      case FloatType =>
-        to += from.getFloat(ordinal)
-      case DecimalType() =>
-        to += from.getDecimal(ordinal)
-      case LongType =>
-        to += from.getLong(ordinal)
-      case ByteType =>
-        to += from.getByte(ordinal)
-      case ShortType =>
-        to += from.getShort(ordinal)
-      case BinaryType =>
-        to += from.getAs[Array[Byte]](ordinal)
-      // SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to 
String here to:
-      // - respect spark.sql.session.timeZone
-      // - work with spark.sql.datetime.java8API.enabled
-      // These types have always been sent over the wire as string, converted 
later.
-      case _: DateType | _: TimestampType =>
-        to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, 
timeFormatters)
-      case CalendarIntervalType =>
-        to += toHiveString(
-          (from.getAs[CalendarInterval](ordinal), CalendarIntervalType),
-          false,
-          timeFormatters)
-      case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] |
-          _: AnsiIntervalType | _: TimestampNTZType =>
-        to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, 
timeFormatters)
-    }
-  }
-
-  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = 
withLocalProperties {
+  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet = 
withLocalProperties {
     try {
       sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, 
forceCancel)
       getNextRowSetInternal(order, maxRowsL)
@@ -137,13 +92,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   private def getNextRowSetInternal(
       order: FetchOrientation,
-      maxRowsL: Long): RowSet = withLocalProperties {
+      maxRowsL: Long): TRowSet = withLocalProperties {
     log.info(s"Received getNextRowSet request order=${order} and 
maxRowsL=${maxRowsL} " +
       s"with ${statementId}")
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
-    val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, 
getProtocolVersion, false)
 
     if (order.equals(FetchOrientation.FETCH_FIRST)) {
       iter.fetchAbsolute(0)
@@ -152,36 +106,15 @@ private[hive] class SparkExecuteStatementOperation(
     } else {
       iter.fetchNext()
     }
-    resultRowSet.setStartOffset(iter.getPosition)
-    if (!iter.hasNext) {
-      resultRowSet
-    } else {
-      val timeFormatters = getTimeFormatters
-      // maxRowsL here typically maps to java.sql.Statement.getFetchSize, 
which is an int
-      val maxRows = maxRowsL.toInt
-      var curRow = 0
-      while (curRow < maxRows && iter.hasNext) {
-        val sparkRow = iter.next()
-        val row = ArrayBuffer[Any]()
-        var curCol = 0
-        while (curCol < sparkRow.length) {
-          if (sparkRow.isNullAt(curCol)) {
-            row += null
-          } else {
-            addNonNullColumnValue(sparkRow, row, curCol, timeFormatters)
-          }
-          curCol += 1
-        }
-        resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
-        curRow += 1
-      }
-      log.info(s"Returning result set with ${curRow} rows from offsets " +
-        s"[${iter.getFetchStart}, ${iter.getPosition}) with $statementId")
-      resultRowSet
-    }
+    val maxRows = maxRowsL.toInt
+    val offset = iter.getPosition
+    val rows = iter.take(maxRows).toList
+    log.info(s"Returning result set with ${rows.length} rows from offsets " +
+      s"[${iter.getFetchStart}, ${offset}) with $statementId")
+    RowSetUtils.toTRowSet(offset, rows, dataTypes, getProtocolVersion, 
getTimeFormatters)
   }
 
-  def getResultSetSchema: TableSchema = resultSchema
+  def getResultSetSchema: TTableSchema = resultSchema
 
   override def runInternal(): Unit = {
     setState(OperationState.PENDING)
@@ -293,11 +226,11 @@ private[hive] class SparkExecuteStatementOperation(
       HiveThriftServer2.eventManager.onStatementParsed(statementId,
         result.queryExecution.toString())
       iter = if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
-        new IterableFetchIterator[SparkRow](new Iterable[SparkRow] {
-          override def iterator: Iterator[SparkRow] = 
result.toLocalIterator.asScala
+        new IterableFetchIterator[Row](new Iterable[Row] {
+          override def iterator: Iterator[Row] = result.toLocalIterator.asScala
         })
       } else {
-        new ArrayFetchIterator[SparkRow](result.collect())
+        new ArrayFetchIterator[Row](result.collect())
       }
       dataTypes = result.schema.fields.map(_.dataType)
     } catch {
@@ -373,17 +306,69 @@ private[hive] class SparkExecuteStatementOperation(
 }
 
 object SparkExecuteStatementOperation {
-  def getTableSchema(structType: StructType): TableSchema = {
-    val schema = structType.map { field =>
-      val attrTypeString = field.dataType match {
-        case CalendarIntervalType => StringType.catalogString
-        case _: YearMonthIntervalType => "interval_year_month"
-        case _: DayTimeIntervalType => "interval_day_time"
-        case _: TimestampNTZType => "timestamp"
-        case other => other.catalogString
-      }
-      new FieldSchema(field.name, attrTypeString, 
field.getComment.getOrElse(""))
+
+  def toTTypeId(typ: DataType): TTypeId = typ match {
+    case NullType => TTypeId.NULL_TYPE
+    case BooleanType => TTypeId.BOOLEAN_TYPE
+    case ByteType => TTypeId.TINYINT_TYPE
+    case ShortType => TTypeId.SMALLINT_TYPE
+    case IntegerType => TTypeId.INT_TYPE
+    case LongType => TTypeId.BIGINT_TYPE
+    case FloatType => TTypeId.FLOAT_TYPE
+    case DoubleType => TTypeId.DOUBLE_TYPE
+    case StringType => TTypeId.STRING_TYPE
+    case _: DecimalType => TTypeId.DECIMAL_TYPE
+    case DateType => TTypeId.DATE_TYPE
+    // TODO: Shall use TIMESTAMPLOCALTZ_TYPE, keep AS-IS now for
+    // unnecessary behavior change
+    case TimestampType => TTypeId.TIMESTAMP_TYPE
+    case TimestampNTZType => TTypeId.TIMESTAMP_TYPE
+    case BinaryType => TTypeId.BINARY_TYPE
+    case CalendarIntervalType => TTypeId.STRING_TYPE
+    case _: DayTimeIntervalType => TTypeId.INTERVAL_DAY_TIME_TYPE
+    case _: YearMonthIntervalType => TTypeId.INTERVAL_YEAR_MONTH_TYPE
+    case _: ArrayType => TTypeId.ARRAY_TYPE
+    case _: MapType => TTypeId.MAP_TYPE
+    case _: StructType => TTypeId.STRUCT_TYPE
+    case other =>
+      throw new IllegalArgumentException(s"Unrecognized type name: 
${other.catalogString}")
+  }
+
+  private def toTTypeQualifiers(typ: DataType): TTypeQualifiers = {
+    val ret = new TTypeQualifiers()
+    val qualifiers = typ match {
+      case d: DecimalType =>
+        Map(
+          TCLIServiceConstants.PRECISION -> 
TTypeQualifierValue.i32Value(d.precision),
+          TCLIServiceConstants.SCALE -> 
TTypeQualifierValue.i32Value(d.scale)).asJava
+      case _ => Collections.emptyMap[String, TTypeQualifierValue]()
+    }
+    ret.setQualifiers(qualifiers)
+    ret
+  }
+
+  private def toTTypeDesc(typ: DataType): TTypeDesc = {
+    val typeEntry = new TPrimitiveTypeEntry(toTTypeId(typ))
+    typeEntry.setTypeQualifiers(toTTypeQualifiers(typ))
+    val tTypeDesc = new TTypeDesc()
+    tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry))
+    tTypeDesc
+  }
+
+  private def toTColumnDesc(field: StructField, pos: Int): TColumnDesc = {
+    val tColumnDesc = new TColumnDesc()
+    tColumnDesc.setColumnName(field.name)
+    tColumnDesc.setTypeDesc(toTTypeDesc(field.dataType))
+    tColumnDesc.setComment(field.getComment().getOrElse(""))
+    tColumnDesc.setPosition(pos)
+    tColumnDesc
+  }
+
+  def toTTableSchema(schema: StructType): TTableSchema = {
+    val tTableSchema = new TTableSchema()
+    schema.zipWithIndex.foreach { case (f, i) =>
+      tTableSchema.addToColumns(toTColumnDesc(f, i))
     }
-    new TableSchema(schema.asJava)
+    tTableSchema
   }
 }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index bcb8ef0cdfb..d135fe7efbb 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -35,9 +35,10 @@ import com.google.common.io.Files
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hive.jdbc.HiveDriver
 import org.apache.hive.service.auth.PlainSaslHelper
-import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType, 
RowSet}
+import org.apache.hive.service.cli.{CLIService, FetchOrientation, FetchType, 
GetInfoType, RowSetFactory}
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
 import org.apache.hive.service.rpc.thrift.TCLIService.Client
+import org.apache.hive.service.rpc.thrift.TRowSet
 import org.apache.thrift.protocol.TBinaryProtocol
 import org.apache.thrift.transport.TSocket
 import org.scalatest.BeforeAndAfterAll
@@ -123,7 +124,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftServer2Test {
             1000,
             FetchType.QUERY_OUTPUT)
 
-          rows_next.numRows()
+          RowSetFactory.create(rows_next, 
sessionHandle.getProtocolVersion).numRows()
         }
 
         // Fetch result second time from first row
@@ -135,7 +136,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftServer2Test {
             1000,
             FetchType.QUERY_OUTPUT)
 
-          rows_first.numRows()
+          RowSetFactory.create(rows_first, 
sessionHandle.getProtocolVersion).numRows()
         }
       }
     }
@@ -749,10 +750,11 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftServer2Test {
   }
 
   test("ThriftCLIService FetchResults FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR") {
-    def checkResult(rows: RowSet, start: Long, end: Long): Unit = {
-      assert(rows.getStartOffset() == start)
-      assert(rows.numRows() == end - start)
-      rows.iterator.asScala.zip((start until end).iterator).foreach { case 
(row, v) =>
+    def checkResult(rows: TRowSet, start: Long, end: Long): Unit = {
+      val rowSet = RowSetFactory.create(rows, CLIService.SERVER_VERSION)
+      assert(rowSet.getStartOffset == start)
+      assert(rowSet.numRows() == end - start)
+      rowSet.iterator.asScala.zip((start until end).iterator).foreach { case 
(row, v) =>
         assert(row(0).asInstanceOf[Long] === v)
       }
     }
@@ -766,7 +768,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftServer2Test {
         sessionHandle,
         "SELECT * FROM range(10)",
         confOverlay) // 10 rows result with sequence 0, 1, 2, ..., 9
-      var rows: RowSet = null
+      var rows: TRowSet = null
 
       // Fetch 5 rows with FETCH_NEXT
       rows = client.fetchResults(
@@ -868,7 +870,7 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftServer2Test {
             FetchOrientation.FETCH_NEXT,
             1000,
             FetchType.QUERY_OUTPUT)
-          rows_next.numRows()
+          RowSetFactory.create(rows_next, 
sessionHandle.getProtocolVersion).numRows()
         }
       }
     }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala
index c8bb6d9ee08..b61c91f3109 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hive.service.cli.OperationState
 import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl}
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TTypeId}
 import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS}
 import org.mockito.invocation.InvocationOnMock
 
@@ -41,22 +41,28 @@ class SparkExecuteStatementOperationSuite extends 
SparkFunSuite with SharedSpark
     val field1 = StructField("NULL", NullType)
     val field2 = StructField("(IF(true, NULL, NULL))", NullType)
     val tableSchema = StructType(Seq(field1, field2))
-    val columns = 
SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors()
-    assert(columns.size() == 2)
-    assert(columns.get(0).getType().getName == "VOID")
-    assert(columns.get(1).getType().getName == "VOID")
+    val columns = SparkExecuteStatementOperation.toTTableSchema(tableSchema)
+    assert(columns.getColumnsSize == 2)
+    
assert(columns.getColumns.get(0).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType
+      === TTypeId.NULL_TYPE)
+    
assert(columns.getColumns.get(1).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType
+      === TTypeId.NULL_TYPE)
   }
 
   test("SPARK-20146 Comment should be preserved") {
     val field1 = StructField("column1", StringType).withComment("comment 1")
     val field2 = StructField("column2", IntegerType)
     val tableSchema = StructType(Seq(field1, field2))
-    val columns = 
SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors()
-    assert(columns.size() == 2)
-    assert(columns.get(0).getType().getName == "STRING")
-    assert(columns.get(0).getComment() == "comment 1")
-    assert(columns.get(1).getType().getName == "INT")
-    assert(columns.get(1).getComment() == "")
+    val columns = SparkExecuteStatementOperation.toTTableSchema(tableSchema)
+    assert(columns.getColumnsSize == 2)
+    assert(columns.getColumns.get(0).getColumnName == "column1")
+    
assert(columns.getColumns.get(0).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType
+      === TTypeId.STRING_TYPE)
+    assert(columns.getColumns.get(0).getComment == "comment 1")
+    assert(columns.getColumns.get(1).getColumnName == "column2")
+    
assert(columns.getColumns.get(1).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType
+      === TTypeId.INT_TYPE)
+    assert(columns.getColumns.get(1).getComment == "")
   }
 
   Seq(
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index b5cfa04bab5..94d7318e620 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -128,14 +128,14 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
 
       val opHandle1 = exec("select current_user(), current_user")
       val rowSet1 = client.fetchResults(opHandle1)
-      rowSet1.toTRowSet.getColumns.forEach { col =>
+      rowSet1.getColumns.forEach { col =>
         assert(col.getStringVal.getValues.get(0) === clientUser)
       }
 
       exec(s"set ${SQLConf.ANSI_ENABLED.key}=true")
       exec(s"set ${SQLConf.ENFORCE_RESERVED_KEYWORDS.key}=true")
       val opHandle2 = exec("select current_user")
-      assert(client.fetchResults(opHandle2).toTRowSet.getColumns.get(0)
+      assert(client.fetchResults(opHandle2).getColumns.get(0)
         .getStringVal.getValues.get(0) === clientUser)
 
       val e = intercept[HiveSQLException](exec("select current_user()"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to