This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c82af8d5502 [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly c82af8d5502 is described below commit c82af8d5502d0e334fe2e628ce66435ec0e341e8 Author: Kent Yao <y...@apache.org> AuthorDate: Fri May 13 10:35:53 2022 +0800 [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly ### What changes were proposed in this pull request? The PR is mainly refactoring, aiming to support TimestampNTZ/LTZ at the same time in the future for the thrift server. As we all know, in spark, we have hive dependencies, which can be classified into two types: - as a client, for accessing hive metastore/storage, etc, which is now v2.3.9, better to stay in a stable low version to be supported by higher hive metastore servers with backward compatibility - as a server, for being accessed by Hive JDBC/Thrift client, e.g. beeline, which is now v3.1.2, better to have a higher version to support more clients The problem here is that we now convert spark results to `org.apache.hadoop.hive.serde2.thrift.Type` first and then to `org.apache.hive.service.rpc.thrift.TTypeId`. the former does not have 2 timestamp types, namely, doesn't have `TIMESTAMPLOCALTZ_TYPE`. To avoid this, we take a shortcut to map spark results to thrift schema and rowset directly. Besides, it also can avoid some unnecessary memory copies from type to type. Most functionalities have been verified in [apache/kyuubi](https://github.com/apache/incubator-kyuubi) for several years. ### Why are the changes needed? for supporting more spark datatypes through hive jdbc ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? existing ut shall be enough Closes #36373 from yaooqinn/SPARK-39041. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../org/apache/hive/service/cli/CLIService.java | 14 +- .../apache/hive/service/cli/CLIServiceClient.java | 3 +- .../org/apache/hive/service/cli/ICLIService.java | 11 +- .../cli/operation/ExecuteStatementOperation.java | 21 - .../cli/operation/GetCatalogsOperation.java | 18 +- .../service/cli/operation/GetColumnsOperation.java | 10 +- .../cli/operation/GetCrossReferenceOperation.java | 10 +- .../cli/operation/GetFunctionsOperation.java | 10 +- .../cli/operation/GetPrimaryKeysOperation.java | 10 +- .../service/cli/operation/GetSchemasOperation.java | 10 +- .../cli/operation/GetTableTypesOperation.java | 10 +- .../service/cli/operation/GetTablesOperation.java | 10 +- .../cli/operation/GetTypeInfoOperation.java | 10 +- .../cli/operation/HiveCommandOperation.java | 214 ---------- .../hive/service/cli/operation/Operation.java | 12 +- .../service/cli/operation/OperationManager.java | 27 +- .../hive/service/cli/operation/SQLOperation.java | 450 --------------------- .../hive/service/cli/session/HiveSession.java | 6 +- .../hive/service/cli/session/HiveSessionImpl.java | 8 +- .../hive/service/cli/thrift/ThriftCLIService.java | 8 +- .../service/cli/thrift/ThriftCLIServiceClient.java | 10 +- .../spark/sql/hive/thriftserver/RowSetUtils.scala | 241 +++++++++++ .../SparkExecuteStatementOperation.scala | 185 ++++----- .../thriftserver/HiveThriftServer2Suites.scala | 20 +- .../SparkExecuteStatementOperationSuite.scala | 28 +- .../ThriftServerWithSparkContextSuite.scala | 4 +- 26 files changed, 454 insertions(+), 906 deletions(-) diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java index f4d07d10a43..4164ef42825 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java @@ -46,6 +46,8 @@ import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TProtocolVersion; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.hive.service.server.HiveServer2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -496,9 +498,9 @@ public class CLIService extends CompositeService implements ICLIService { * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle) */ @Override - public TableSchema getResultSetMetadata(OperationHandle opHandle) + public TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - TableSchema tableSchema = sessionManager.getOperationManager() + TTableSchema tableSchema = sessionManager.getOperationManager() .getOperation(opHandle).getParentSession().getResultSetMetadata(opHandle); LOG.debug(opHandle + ": getResultSetMetadata()"); return tableSchema; @@ -508,16 +510,16 @@ public class CLIService extends CompositeService implements ICLIService { * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override - public RowSet fetchResults(OperationHandle opHandle) + public TRowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { return fetchResults(opHandle, Operation.DEFAULT_FETCH_ORIENTATION, Operation.DEFAULT_FETCH_MAX_ROWS, FetchType.QUERY_OUTPUT); } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, - long maxRows, FetchType fetchType) throws HiveSQLException { - RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) + public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException { + TRowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java index 684c666da63..964dd5a5899 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -20,6 +20,7 @@ package org.apache.hive.service.cli; import java.util.Collections; import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.rpc.thrift.TRowSet; /** @@ -35,7 +36,7 @@ public abstract class CLIServiceClient implements ICLIService { } @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + public TRowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: provide STATIC default value return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, DEFAULT_MAX_ROWS, FetchType.QUERY_OUTPUT); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java index 42706f382a5..f896b7a6775 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/ICLIService.java @@ -19,11 +19,10 @@ package org.apache.hive.service.cli; import java.util.List; import java.util.Map; - - - import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.rpc.thrift.TOperationHandle; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; public interface ICLIService { @@ -86,13 +85,13 @@ public interface ICLIService { void closeOperation(OperationHandle opHandle) throws HiveSQLException; - TableSchema getResultSetMetadata(OperationHandle opHandle) + TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - RowSet fetchResults(OperationHandle opHandle) + TRowSet fetchResults(OperationHandle opHandle) throws HiveSQLException; - RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException; String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index 96617dde131..815a369b6b2 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -16,13 +16,9 @@ */ package org.apache.hive.service.cli.operation; -import java.sql.SQLException; import java.util.Map; -import org.apache.hadoop.hive.ql.processors.CommandProcessor; -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationType; import org.apache.hive.service.cli.session.HiveSession; @@ -39,23 +35,6 @@ public abstract class ExecuteStatementOperation extends Operation { return statement; } - public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) - throws HiveSQLException { - String[] tokens = statement.trim().split("\\s+"); - CommandProcessor processor = null; - try { - processor = CommandProcessorFactory.getForHiveCommand(tokens, parentSession.getHiveConf()); - } catch (SQLException e) { - throw new HiveSQLException(e.getMessage(), e.getSQLState(), e); - } - if (processor == null) { - // runAsync, queryTimeout makes sense only for a SQLOperation - return new SQLOperation(parentSession, statement, confOverlay, runAsync, queryTimeout); - } - return new HiveCommandOperation(parentSession, statement, processor, confOverlay); - } - protected void registerCurrentOperationLog() { if (isOperationLogEnabled) { if (operationLog == null) { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 6f2deddecdc..ef4bbb45e8f 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -18,14 +18,10 @@ package org.apache.hive.service.cli.operation; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationType; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetCatalogsOperation. @@ -61,20 +57,20 @@ public class GetCatalogsOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { - return RESULT_SET_SCHEMA; + public TTableSchema getResultSetSchema() throws HiveSQLException { + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index d48ca6174be..250adc51f81 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -48,6 +48,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetColumnsOperation. @@ -229,22 +231,22 @@ public class GetColumnsOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java index 9b149f0a5bf..3a29859a207 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import java.util.List; @@ -140,21 +142,21 @@ public class GetCrossReferenceOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 9cb9ded930a..3f02f753bf8 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -37,6 +37,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.thrift.TException; /** @@ -126,21 +128,21 @@ public class GetFunctionsOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java index 29842ad30ac..92732834297 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import java.util.List; @@ -94,21 +96,21 @@ public class GetPrimaryKeysOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index fb74d02773b..865e264bd5f 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -27,6 +27,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetSchemasOperation. @@ -75,21 +77,21 @@ public class GetSchemasOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index 115e6651f73..b75eaec5ff6 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -28,6 +28,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetTableTypesOperation. @@ -71,22 +73,22 @@ public class GetTableTypesOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 857f00a5e92..bd9f0814814 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -35,6 +35,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetTablesOperation. @@ -122,21 +124,21 @@ public class GetTablesOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index b480e548e78..ad692d46edd 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -27,6 +27,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; /** * GetTypeInfoOperation. @@ -121,21 +123,21 @@ public class GetTypeInfoOperation extends MetadataOperation { * @see org.apache.hive.service.cli.Operation#getResultSetSchema() */ @Override - public TableSchema getResultSetSchema() throws HiveSQLException { + public TTableSchema getResultSetSchema() throws HiveSQLException { assertState(OperationState.FINISHED); - return RESULT_SET_SCHEMA; + return RESULT_SET_SCHEMA.toTTableSchema(); } /* (non-Javadoc) * @see org.apache.hive.service.cli.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) */ @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); } - return rowSet.extractSubset((int)maxRows); + return rowSet.extractSubset((int)maxRows).toTRowSet(); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java deleted file mode 100644 index 173256d4782..00000000000 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.service.cli.operation; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.processors.CommandProcessor; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.service.ServiceUtils; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.session.HiveSession; - -/** - * Executes a HiveCommand - */ -public class HiveCommandOperation extends ExecuteStatementOperation { - private CommandProcessor commandProcessor; - private TableSchema resultSchema = null; - - /** - * For processors other than Hive queries (Driver), they output to session.out (a temp file) - * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn. - */ - private BufferedReader resultReader; - - - protected HiveCommandOperation(HiveSession parentSession, String statement, - CommandProcessor commandProcessor, Map<String, String> confOverlay) { - super(parentSession, statement, confOverlay, false); - this.commandProcessor = commandProcessor; - setupSessionIO(parentSession.getSessionState()); - } - - private void setupSessionIO(SessionState sessionState) { - try { - LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString()); - sessionState.in = null; // hive server's session input stream is not used - // open a per-session file in auto-flush mode for writing temp results - sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, UTF_8.name()); - // TODO: for hadoop jobs, progress is printed out to session.err, - // we should find a way to feed back job progress to client - sessionState.err = new PrintStream(System.err, true, UTF_8.name()); - } catch (IOException e) { - LOG.error("Error in creating temp output file ", e); - try { - sessionState.in = null; - sessionState.out = new PrintStream(System.out, true, UTF_8.name()); - sessionState.err = new PrintStream(System.err, true, UTF_8.name()); - } catch (UnsupportedEncodingException ee) { - LOG.error("Error creating PrintStream", e); - ee.printStackTrace(); - sessionState.out = null; - sessionState.err = null; - } - } - } - - - private void tearDownSessionIO() { - ServiceUtils.cleanup(LOG, - parentSession.getSessionState().out, parentSession.getSessionState().err); - } - - @Override - public void runInternal() throws HiveSQLException { - setState(OperationState.RUNNING); - try { - String command = getStatement().trim(); - String[] tokens = statement.split("\\s"); - String commandArgs = command.substring(tokens[0].length()).trim(); - - CommandProcessorResponse response = commandProcessor.run(commandArgs); - int returnCode = response.getResponseCode(); - if (returnCode != 0) { - throw toSQLException("Error while processing statement", response); - } - Schema schema = response.getSchema(); - if (schema != null) { - setHasResultSet(true); - resultSchema = new TableSchema(schema); - } else { - setHasResultSet(false); - resultSchema = new TableSchema(); - } - } catch (HiveSQLException e) { - setState(OperationState.ERROR); - throw e; - } catch (Exception e) { - setState(OperationState.ERROR); - throw new HiveSQLException("Error running query: " + e.toString(), e); - } - setState(OperationState.FINISHED); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.operation.Operation#close() - */ - @Override - public void close() throws HiveSQLException { - setState(OperationState.CLOSED); - tearDownSessionIO(); - cleanTmpFile(); - cleanupOperationLog(); - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.operation.Operation#getResultSetSchema() - */ - @Override - public TableSchema getResultSetSchema() throws HiveSQLException { - return resultSchema; - } - - /* (non-Javadoc) - * @see org.apache.hive.service.cli.operation.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long) - */ - @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - validateDefaultFetchOrientation(orientation); - if (orientation.equals(FetchOrientation.FETCH_FIRST)) { - resetResultReader(); - } - List<String> rows = readResults((int) maxRows); - RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false); - - for (String row : rows) { - rowSet.addRow(new String[] {row}); - } - return rowSet; - } - - /** - * Reads the temporary results for non-Hive (non-Driver) commands to the - * resulting List of strings. - * @param nLines number of lines read at once. If it is <= 0, then read all lines. - */ - private List<String> readResults(int nLines) throws HiveSQLException { - if (resultReader == null) { - SessionState sessionState = getParentSession().getSessionState(); - File tmp = sessionState.getTmpOutputFile(); - try { - resultReader = new BufferedReader(new FileReader(tmp)); - } catch (FileNotFoundException e) { - LOG.error("File " + tmp + " not found. ", e); - throw new HiveSQLException(e); - } - } - List<String> results = new ArrayList<String>(); - - for (int i = 0; i < nLines || nLines <= 0; ++i) { - try { - String line = resultReader.readLine(); - if (line == null) { - // reached the end of the result file - break; - } else { - results.add(line); - } - } catch (IOException e) { - LOG.error("Reading temp results encountered an exception: ", e); - throw new HiveSQLException(e); - } - } - return results; - } - - private void cleanTmpFile() { - resetResultReader(); - SessionState sessionState = getParentSession().getSessionState(); - sessionState.deleteTmpOutputFile(); - sessionState.deleteTmpErrOutputFile(); - } - - private void resetResultReader() { - if (resultReader != null) { - ServiceUtils.cleanup(LOG, resultReader); - resultReader = null; - } - } -} diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java index 9651701f9c3..ad42925207d 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java @@ -34,10 +34,10 @@ import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationState; import org.apache.hive.service.cli.OperationStatus; import org.apache.hive.service.cli.OperationType; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.rpc.thrift.TProtocolVersion; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -302,13 +302,9 @@ public abstract class Operation { cleanupOperationLog(); } - public abstract TableSchema getResultSetSchema() throws HiveSQLException; + public abstract TTableSchema getResultSetSchema() throws HiveSQLException; - public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException; - - public RowSet getNextRowSet() throws HiveSQLException { - return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); - } + public abstract TRowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException; /** * Verify if the given fetch orientation is part of the default orientation types. diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 40daa1ff493..5aede567e95 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -37,6 +37,8 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.logging.log4j.core.appender.AbstractWriterAppender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,20 +87,10 @@ public class OperationManager extends AbstractService { ap.start(); } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, - String statement, Map<String, String> confOverlay, boolean runAsync) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation - .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, 0); - addOperation(executeStatementOperation); - return executeStatementOperation; - } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { - return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, - queryTimeout); + throw new UnsupportedOperationException(); } public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { @@ -230,23 +222,18 @@ public class OperationManager extends AbstractService { operation.close(); } - public TableSchema getOperationResultSetSchema(OperationHandle opHandle) + public TTableSchema getOperationResultSetSchema(OperationHandle opHandle) throws HiveSQLException { return getOperation(opHandle).getResultSetSchema(); } - public RowSet getOperationNextRowSet(OperationHandle opHandle) - throws HiveSQLException { - return getOperation(opHandle).getNextRowSet(); - } - - public RowSet getOperationNextRowSet(OperationHandle opHandle, + public TRowSet getOperationNextRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { return getOperation(opHandle).getNextRowSet(orientation, maxRows); } - public RowSet getOperationLogRowSet(OperationHandle opHandle, + public TRowSet getOperationLogRowSet(OperationHandle opHandle, FetchOrientation orientation, long maxRows) throws HiveSQLException { // get the OperationLog object from the operation @@ -272,7 +259,7 @@ public class OperationManager extends AbstractService { rowSet.addRow(new String[] {log}); } - return rowSet; + return rowSet.toTRowSet(); } private boolean isFetchFirst(FetchOrientation fetchOrientation) { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java deleted file mode 100644 index ce704b281df..00000000000 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.service.cli.operation; - -import java.io.IOException; -import java.io.Serializable; -import java.security.PrivilegedExceptionAction; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.ExplainTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.AbstractSerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.RowSetFactory; -import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.session.HiveSession; -import org.apache.hive.service.server.ThreadWithGarbageCleanup; - -/** - * SQLOperation. - * - */ -public class SQLOperation extends ExecuteStatementOperation { - - private Driver driver = null; - private CommandProcessorResponse response; - private TableSchema resultSchema = null; - private Schema mResultSchema = null; - private AbstractSerDe serde = null; - private boolean fetchStarted = false; - - public SQLOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay, - boolean runInBackground, long queryTimeout) { - // TODO: call setRemoteUser in ExecuteStatementOperation or higher. - super(parentSession, statement, confOverlay, runInBackground); - } - - /*** - * Compile the query and extract metadata - * @param queryState - * @throws HiveSQLException - */ - public void prepare(QueryState queryState) throws HiveSQLException { - setState(OperationState.RUNNING); - - try { - driver = new Driver(queryState, getParentSession().getUserName()); - - // set the operation handle information in Driver, so that thrift API users - // can use the operation handle they receive, to lookup query information in - // Yarn ATS - String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier() - .toTHandleIdentifier().getGuid()).trim(); - driver.setOperationId(guid64); - - // In Hive server mode, we are not able to retry in the FetchTask - // case, when calling fetch queries since execute() has returned. - // For now, we disable the test attempts. - driver.setTryCount(Integer.MAX_VALUE); - - response = driver.compileAndRespond(statement); - if (0 != response.getResponseCode()) { - throw toSQLException("Error while compiling statement", response); - } - - mResultSchema = driver.getSchema(); - - // hasResultSet should be true only if the query has a FetchTask - // "explain" is an exception for now - if(driver.getPlan().getFetchTask() != null) { - //Schema has to be set - if (mResultSchema == null || !mResultSchema.isSetFieldSchemas()) { - throw new HiveSQLException("Error compiling query: Schema and FieldSchema " + - "should be set when query plan has a FetchTask"); - } - resultSchema = new TableSchema(mResultSchema); - setHasResultSet(true); - } else { - setHasResultSet(false); - } - // Set hasResultSet true if the plan has ExplainTask - // TODO explain should use a FetchTask for reading - for (Task<? extends Serializable> task: driver.getPlan().getRootTasks()) { - if (task.getClass() == ExplainTask.class) { - resultSchema = new TableSchema(mResultSchema); - setHasResultSet(true); - break; - } - } - } catch (HiveSQLException e) { - setState(OperationState.ERROR); - throw e; - } catch (Exception e) { - setState(OperationState.ERROR); - throw new HiveSQLException("Error running query: " + e.toString(), e); - } - } - - private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { - try { - // In Hive server mode, we are not able to retry in the FetchTask - // case, when calling fetch queries since execute() has returned. - // For now, we disable the test attempts. - driver.setTryCount(Integer.MAX_VALUE); - response = driver.run(); - if (0 != response.getResponseCode()) { - throw toSQLException("Error while processing statement", response); - } - } catch (HiveSQLException e) { - // If the operation was cancelled by another thread or timed out, - // Driver#run will return a non-zero response code. - // We will simply return if the operation state is CANCELED or TIMEDOUT, - // otherwise throw an exception - if (getStatus().getState() == OperationState.CANCELED || - getStatus().getState() == OperationState.TIMEDOUT) { - return; - } - else { - setState(OperationState.ERROR); - throw e; - } - } catch (Exception e) { - setState(OperationState.ERROR); - throw new HiveSQLException("Error running query: " + e.toString(), e); - } - setState(OperationState.FINISHED); - } - - @Override - public void runInternal() throws HiveSQLException { - setState(OperationState.PENDING); - final HiveConf opConfig = getConfigForOperation(); - prepare(queryState); - if (!shouldRunAsync()) { - runQuery(opConfig); - } else { - // We'll pass ThreadLocals in the background thread from the foreground (handler) thread - final SessionState parentSessionState = SessionState.get(); - // ThreadLocal Hive object needs to be set in background thread. - // The metastore client in Hive is associated with right user. - final Hive parentHive = getSessionHive(); - // Current UGI will get used by metastore when metsatore is in embedded mode - // So this needs to get passed to the new background thread - final UserGroupInformation currentUGI = getCurrentUGI(opConfig); - // Runnable impl to call runInternal asynchronously, - // from a different thread - Runnable backgroundOperation = () -> { - PrivilegedExceptionAction<Object> doAsAction = () -> { - Hive.set(parentHive); - SessionState.setCurrentSessionState(parentSessionState); - // Set current OperationLog in this async thread for keeping on saving query log. - registerCurrentOperationLog(); - try { - runQuery(opConfig); - } catch (HiveSQLException e) { - setOperationException(e); - LOG.error("Error running hive query: ", e); - } finally { - unregisterOperationLog(); - } - return null; - }; - - try { - currentUGI.doAs(doAsAction); - } catch (Exception e) { - setOperationException(new HiveSQLException(e)); - LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); - } - finally { - /** - * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup - * when this thread is garbage collected later. - * @see ThreadWithGarbageCleanup#finalize() - */ - if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { - ThreadWithGarbageCleanup currentThread = - (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); - currentThread.cacheThreadLocalRawStore(); - } - } - }; - try { - // This submit blocks if no background threads are available to run this operation - Future<?> backgroundHandle = - getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); - setBackgroundHandle(backgroundHandle); - } catch (RejectedExecutionException rejected) { - setState(OperationState.ERROR); - throw new HiveSQLException("The background threadpool cannot accept" + - " new task for execution, please retry the operation", rejected); - } - } - } - - /** - * Returns the current UGI on the stack - * @param opConfig - * @return UserGroupInformation - * @throws HiveSQLException - */ - private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException { - try { - return Utils.getUGI(); - } catch (Exception e) { - throw new HiveSQLException("Unable to get current user", e); - } - } - - /** - * Returns the ThreadLocal Hive for the current thread - * @return Hive - * @throws HiveSQLException - */ - private Hive getSessionHive() throws HiveSQLException { - try { - return Hive.get(); - } catch (HiveException e) { - throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); - } - } - - private void cleanup(OperationState state) throws HiveSQLException { - setState(state); - if (shouldRunAsync()) { - Future<?> backgroundHandle = getBackgroundHandle(); - if (backgroundHandle != null) { - backgroundHandle.cancel(true); - } - } - if (driver != null) { - driver.close(); - driver.destroy(); - } - driver = null; - - SessionState ss = SessionState.get(); - if (ss.getTmpOutputFile() != null) { - ss.getTmpOutputFile().delete(); - } - } - - @Override - public void cancel() throws HiveSQLException { - cleanup(OperationState.CANCELED); - } - - @Override - public void close() throws HiveSQLException { - cleanup(OperationState.CLOSED); - cleanupOperationLog(); - } - - @Override - public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); - if (resultSchema == null) { - resultSchema = new TableSchema(driver.getSchema()); - } - return resultSchema; - } - - private final transient List<Object> convey = new ArrayList<Object>(); - - @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - validateDefaultFetchOrientation(orientation); - assertState(OperationState.FINISHED); - - RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false); - - try { - /* if client is requesting fetch-from-start and its not the first time reading from this operation - * then reset the fetch position to beginning - */ - if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) { - driver.resetFetch(); - } - fetchStarted = true; - driver.setMaxRows((int) maxRows); - if (driver.getResults(convey)) { - return decode(convey, rowSet); - } - return rowSet; - } catch (IOException e) { - throw new HiveSQLException(e); - } catch (CommandNeedRetryException e) { - throw new HiveSQLException(e); - } catch (Exception e) { - throw new HiveSQLException(e); - } finally { - convey.clear(); - } - } - - private RowSet decode(List<Object> rows, RowSet rowSet) throws Exception { - if (driver.isFetchingTable()) { - return prepareFromRow(rows, rowSet); - } - return decodeFromString(rows, rowSet); - } - - // already encoded to thrift-able object in ThriftFormatter - private RowSet prepareFromRow(List<Object> rows, RowSet rowSet) throws Exception { - for (Object row : rows) { - rowSet.addRow((Object[]) row); - } - return rowSet; - } - - private RowSet decodeFromString(List<Object> rows, RowSet rowSet) - throws SQLException, SerDeException { - getSerDe(); - StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); - List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs(); - - Object[] deserializedFields = new Object[fieldRefs.size()]; - Object rowObj; - ObjectInspector fieldOI; - - int protocol = getProtocolVersion().getValue(); - for (Object rowString : rows) { - rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes(UTF_8))); - for (int i = 0; i < fieldRefs.size(); i++) { - StructField fieldRef = fieldRefs.get(i); - fieldOI = fieldRef.getFieldObjectInspector(); - Object fieldData = soi.getStructFieldData(rowObj, fieldRef); - deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI, protocol); - } - rowSet.addRow(deserializedFields); - } - return rowSet; - } - - private AbstractSerDe getSerDe() throws SQLException { - if (serde != null) { - return serde; - } - try { - List<FieldSchema> fieldSchemas = mResultSchema.getFieldSchemas(); - StringBuilder namesSb = new StringBuilder(); - StringBuilder typesSb = new StringBuilder(); - - if (fieldSchemas != null && !fieldSchemas.isEmpty()) { - for (int pos = 0; pos < fieldSchemas.size(); pos++) { - if (pos != 0) { - namesSb.append(","); - typesSb.append(","); - } - namesSb.append(fieldSchemas.get(pos).getName()); - typesSb.append(fieldSchemas.get(pos).getType()); - } - } - String names = namesSb.toString(); - String types = typesSb.toString(); - - serde = new LazySimpleSerDe(); - Properties props = new Properties(); - if (names.length() > 0) { - LOG.debug("Column names: " + names); - props.setProperty(serdeConstants.LIST_COLUMNS, names); - } - if (types.length() > 0) { - LOG.debug("Column types: " + types); - props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); - } - SerDeUtils.initializeSerDe(serde, new HiveConf(), props, null); - - } catch (Exception ex) { - ex.printStackTrace(); - throw new SQLException("Could not create ResultSet: " + ex.getMessage(), ex); - } - return serde; - } - - /** - * If there are query specific settings to overlay, then create a copy of config - * There are two cases we need to clone the session config that's being passed to hive driver - * 1. Async query - - * If the client changes a config setting, that shouldn't reflect in the execution already underway - * 2. confOverlay - - * The query specific settings should only be applied to the query config and not session - * @return new configuration - * @throws HiveSQLException - */ - private HiveConf getConfigForOperation() throws HiveSQLException { - HiveConf sqlOperationConf = getParentSession().getHiveConf(); - if (!getConfOverlay().isEmpty() || shouldRunAsync()) { - // clone the parent session config for this query - sqlOperationConf = new HiveConf(sqlOperationConf); - - // apply overlay query specific settings, if any - for (Map.Entry<String, String> confEntry : getConfOverlay().entrySet()) { - try { - sqlOperationConf.verifyAndSet(confEntry.getKey(), confEntry.getValue()); - } catch (IllegalArgumentException e) { - throw new HiveSQLException("Error applying statement specific settings", e); - } - } - } - return sqlOperationConf; - } -} diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java index ca0540e5aac..b1bb2096822 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java @@ -23,6 +23,8 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.*; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; public interface HiveSession extends HiveSessionBase { @@ -182,10 +184,10 @@ public interface HiveSession extends HiveSessionBase { void closeOperation(OperationHandle opHandle) throws HiveSQLException; - TableSchema getResultSetMetadata(OperationHandle opHandle) + TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException; String getDelegationToken(HiveAuthFactory authFactory, String owner, diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 8e1e500ff78..4ac3c1e5887 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -52,9 +52,7 @@ import org.apache.hive.service.cli.GetInfoType; import org.apache.hive.service.cli.GetInfoValue; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.GetCatalogsOperation; import org.apache.hive.service.cli.operation.GetColumnsOperation; @@ -68,6 +66,8 @@ import org.apache.hive.service.cli.operation.MetadataOperation; import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.rpc.thrift.TProtocolVersion; +import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.hive.service.server.ThreadWithGarbageCleanup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -789,7 +789,7 @@ public class HiveSessionImpl implements HiveSession { } @Override - public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { + public TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { acquire(true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); @@ -799,7 +799,7 @@ public class HiveSessionImpl implements HiveSession { } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { acquire(true); try { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 4a223c8666a..3517df908b2 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -624,8 +624,8 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe throws TException { TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); try { - TableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); - resp.setSchema(schema.toTTableSchema()); + TTableSchema schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle())); + resp.setSchema(schema); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error getting result set metadata: ", e); @@ -638,12 +638,12 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); try { - RowSet rowSet = cliService.fetchResults( + TRowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()), req.getMaxRows(), FetchType.getFetchType(req.getFetchType())); - resp.setResults(rowSet.toTRowSet()); + resp.setResults(rowSet); resp.setHasMoreRows(false); resp.setStatus(OK_STATUS); } catch (Exception e) { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 6fc2d3a5fa9..4cfb48e0640 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -362,13 +362,13 @@ public class ThriftCLIServiceClient extends CLIServiceClient { * @see org.apache.hive.service.cli.ICLIService#getResultSetMetadata(org.apache.hive.service.cli.OperationHandle) */ @Override - public TableSchema getResultSetMetadata(OperationHandle opHandle) + public TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { try { TGetResultSetMetadataReq req = new TGetResultSetMetadataReq(opHandle.toTOperationHandle()); TGetResultSetMetadataResp resp = cliService.GetResultSetMetadata(req); checkStatus(resp.getStatus()); - return new TableSchema(resp.getSchema()); + return resp.getSchema(); } catch (HiveSQLException e) { throw e; } catch (Exception e) { @@ -377,7 +377,7 @@ public class ThriftCLIServiceClient extends CLIServiceClient { } @Override - public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, + public TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { try { TFetchResultsReq req = new TFetchResultsReq(); @@ -387,7 +387,7 @@ public class ThriftCLIServiceClient extends CLIServiceClient { req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); - return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); + return resp.getResults(); } catch (HiveSQLException e) { throw e; } catch (Exception e) { @@ -399,7 +399,7 @@ public class ThriftCLIServiceClient extends CLIServiceClient { * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle) */ @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + public TRowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { // TODO: set the correct default fetch size return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT); } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala new file mode 100644 index 00000000000..9625021f392 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.HiveResult.{toHiveString, TimeFormatters} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType} + +object RowSetUtils { + + implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = { + ByteBuffer.wrap(bitSet.toByteArray) + } + + def toTRowSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + protocolVersion: TProtocolVersion, + timeFormatters: TimeFormatters): TRowSet = { + if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { + toRowBasedSet(startRowOffSet, rows, schema, timeFormatters) + } else { + toColumnBasedSet(startRowOffSet, rows, schema, timeFormatters) + } + } + + private def toRowBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { + var i = 0 + val rowSize = rows.length + val tRows = new java.util.ArrayList[TRow](rowSize) + while (i < rowSize) { + val row = rows(i) + val tRow = new TRow() + var j = 0 + val columnSize = row.length + while (j < columnSize) { + val columnValue = toTColumnValue(j, row, schema(j), timeFormatters) + tRow.addToColVals(columnValue) + j += 1 + } + i += 1 + tRows.add(tRow) + } + new TRowSet(startRowOffSet, tRows) + } + + private def toColumnBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { + val rowSize = rows.length + val tRowSet = new TRowSet(startRowOffSet, new java.util.ArrayList[TRow](rowSize)) + var i = 0 + val columnSize = schema.length + while (i < columnSize) { + val tColumn = toTColumn(rows, i, schema(i), timeFormatters) + tRowSet.addToColumns(tColumn) + i += 1 + } + tRowSet + } + + private def toTColumn( + rows: Seq[Row], ordinal: Int, typ: DataType, timeFormatters: TimeFormatters): TColumn = { + val nulls = new java.util.BitSet() + typ match { + case BooleanType => + val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) + TColumn.boolVal(new TBoolColumn(values, nulls)) + + case ByteType => + val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) + TColumn.byteVal(new TByteColumn(values, nulls)) + + case ShortType => + val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) + TColumn.i16Val(new TI16Column(values, nulls)) + + case IntegerType => + val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) + TColumn.i32Val(new TI32Column(values, nulls)) + + case LongType => + val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) + TColumn.i64Val(new TI64Column(values, nulls)) + + case FloatType => + val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) + .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case DoubleType => + val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case StringType => + val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "") + TColumn.stringVal(new TStringColumn(values, nulls)) + + case BinaryType => + val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array.empty[Byte]) + .asScala + .map(ByteBuffer.wrap) + .asJava + TColumn.binaryVal(new TBinaryColumn(values, nulls)) + + case _ => + var i = 0 + val rowSize = rows.length + val values = new java.util.ArrayList[String](rowSize) + while (i < rowSize) { + val row = rows(i) + nulls.set(i, row.isNullAt(ordinal)) + val value = if (row.isNullAt(ordinal)) { + "" + } else { + toHiveString((row.get(ordinal), typ), nested = true, timeFormatters) + } + values.add(value) + i += 1 + } + TColumn.stringVal(new TStringColumn(values, nulls)) + } + } + + private def getOrSetAsNull[T]( + rows: Seq[Row], + ordinal: Int, + nulls: java.util.BitSet, + defaultVal: T): java.util.List[T] = { + val size = rows.length + val ret = new java.util.ArrayList[T](size) + var idx = 0 + while (idx < size) { + val row = rows(idx) + if (row.isNullAt(ordinal)) { + nulls.set(idx, true) + ret.add(idx, defaultVal) + } else { + ret.add(idx, row.getAs[T](ordinal)) + } + idx += 1 + } + ret + } + + private def toTColumnValue( + ordinal: Int, + row: Row, + dataType: DataType, + timeFormatters: TimeFormatters): TColumnValue = { + dataType match { + case BooleanType => + val boolValue = new TBoolValue + if (!row.isNullAt(ordinal)) boolValue.setValue(row.getBoolean(ordinal)) + TColumnValue.boolVal(boolValue) + + case ByteType => + val byteValue = new TByteValue + if (!row.isNullAt(ordinal)) byteValue.setValue(row.getByte(ordinal)) + TColumnValue.byteVal(byteValue) + + case ShortType => + val tI16Value = new TI16Value + if (!row.isNullAt(ordinal)) tI16Value.setValue(row.getShort(ordinal)) + TColumnValue.i16Val(tI16Value) + + case IntegerType => + val tI32Value = new TI32Value + if (!row.isNullAt(ordinal)) tI32Value.setValue(row.getInt(ordinal)) + TColumnValue.i32Val(tI32Value) + + case LongType => + val tI64Value = new TI64Value + if (!row.isNullAt(ordinal)) tI64Value.setValue(row.getLong(ordinal)) + TColumnValue.i64Val(tI64Value) + + case FloatType => + val tDoubleValue = new TDoubleValue + if (!row.isNullAt(ordinal)) { + // Floats are converted to doubles during thrift transportation. + // Passing float to Double.valueOf causes precision loss, e.g. + // scala> java.lang.Double.valueOf(0.1f) + // res0: Double = 0.10000000149011612 + // + // hereby toString is called ahead. + // scala> java.lang.Double.valueOf(0.1f.toString) + // res1: Double = 0.1 + val doubleValue = java.lang.Double.valueOf(row.getFloat(ordinal).toString) + tDoubleValue.setValue(doubleValue) + } + TColumnValue.doubleVal(tDoubleValue) + + case DoubleType => + val tDoubleValue = new TDoubleValue + if (!row.isNullAt(ordinal)) tDoubleValue.setValue(row.getDouble(ordinal)) + TColumnValue.doubleVal(tDoubleValue) + + case StringType => + val tStringValue = new TStringValue + if (!row.isNullAt(ordinal)) tStringValue.setValue(row.getString(ordinal)) + TColumnValue.stringVal(tStringValue) + + case _ => + val tStrValue = new TStringValue + if (!row.isNullAt(ordinal)) { + val value = toHiveString((row.get(ordinal), dataType), nested = false, timeFormatters) + tStrValue.setValue(value) + } + TColumnValue.stringVal(tStrValue) + } + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 4f4088990a9..2c77e00c46c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -18,26 +18,23 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction -import java.util.{Arrays, Map => JMap} +import java.util.{Collections, Map => JMap} import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.Utils import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, TTypeQualifiers, TTypeQualifierValue} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} -import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveString, TimeFormatters} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.VariableSubstitution +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.execution.HiveResult.getTimeFormatters +import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( @@ -71,62 +68,20 @@ private[hive] class SparkExecuteStatementOperation( private var result: DataFrame = _ - private var iter: FetchIterator[SparkRow] = _ + private var iter: FetchIterator[Row] = _ private var dataTypes: Array[DataType] = _ - private lazy val resultSchema: TableSchema = { + private lazy val resultSchema: TTableSchema = { if (result == null || result.schema.isEmpty) { - new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) + val sparkType = new StructType().add("Result", "string") + SparkExecuteStatementOperation.toTTableSchema(sparkType) } else { logInfo(s"Result Schema: ${result.schema}") - SparkExecuteStatementOperation.getTableSchema(result.schema) + SparkExecuteStatementOperation.toTTableSchema(result.schema) } } - def addNonNullColumnValue( - from: SparkRow, - to: ArrayBuffer[Any], - ordinal: Int, - timeFormatters: TimeFormatters): Unit = { - dataTypes(ordinal) match { - case StringType => - to += from.getString(ordinal) - case IntegerType => - to += from.getInt(ordinal) - case BooleanType => - to += from.getBoolean(ordinal) - case DoubleType => - to += from.getDouble(ordinal) - case FloatType => - to += from.getFloat(ordinal) - case DecimalType() => - to += from.getDecimal(ordinal) - case LongType => - to += from.getLong(ordinal) - case ByteType => - to += from.getByte(ordinal) - case ShortType => - to += from.getShort(ordinal) - case BinaryType => - to += from.getAs[Array[Byte]](ordinal) - // SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to: - // - respect spark.sql.session.timeZone - // - work with spark.sql.datetime.java8API.enabled - // These types have always been sent over the wire as string, converted later. - case _: DateType | _: TimestampType => - to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, timeFormatters) - case CalendarIntervalType => - to += toHiveString( - (from.getAs[CalendarInterval](ordinal), CalendarIntervalType), - false, - timeFormatters) - case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] | - _: AnsiIntervalType | _: TimestampNTZType => - to += toHiveString((from.get(ordinal), dataTypes(ordinal)), false, timeFormatters) - } - } - - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet = withLocalProperties { try { sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel) getNextRowSetInternal(order, maxRowsL) @@ -137,13 +92,12 @@ private[hive] class SparkExecuteStatementOperation( private def getNextRowSetInternal( order: FetchOrientation, - maxRowsL: Long): RowSet = withLocalProperties { + maxRowsL: Long): TRowSet = withLocalProperties { log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) - val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion, false) if (order.equals(FetchOrientation.FETCH_FIRST)) { iter.fetchAbsolute(0) @@ -152,36 +106,15 @@ private[hive] class SparkExecuteStatementOperation( } else { iter.fetchNext() } - resultRowSet.setStartOffset(iter.getPosition) - if (!iter.hasNext) { - resultRowSet - } else { - val timeFormatters = getTimeFormatters - // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int - val maxRows = maxRowsL.toInt - var curRow = 0 - while (curRow < maxRows && iter.hasNext) { - val sparkRow = iter.next() - val row = ArrayBuffer[Any]() - var curCol = 0 - while (curCol < sparkRow.length) { - if (sparkRow.isNullAt(curCol)) { - row += null - } else { - addNonNullColumnValue(sparkRow, row, curCol, timeFormatters) - } - curCol += 1 - } - resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) - curRow += 1 - } - log.info(s"Returning result set with ${curRow} rows from offsets " + - s"[${iter.getFetchStart}, ${iter.getPosition}) with $statementId") - resultRowSet - } + val maxRows = maxRowsL.toInt + val offset = iter.getPosition + val rows = iter.take(maxRows).toList + log.info(s"Returning result set with ${rows.length} rows from offsets " + + s"[${iter.getFetchStart}, ${offset}) with $statementId") + RowSetUtils.toTRowSet(offset, rows, dataTypes, getProtocolVersion, getTimeFormatters) } - def getResultSetSchema: TableSchema = resultSchema + def getResultSetSchema: TTableSchema = resultSchema override def runInternal(): Unit = { setState(OperationState.PENDING) @@ -293,11 +226,11 @@ private[hive] class SparkExecuteStatementOperation( HiveThriftServer2.eventManager.onStatementParsed(statementId, result.queryExecution.toString()) iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { - new IterableFetchIterator[SparkRow](new Iterable[SparkRow] { - override def iterator: Iterator[SparkRow] = result.toLocalIterator.asScala + new IterableFetchIterator[Row](new Iterable[Row] { + override def iterator: Iterator[Row] = result.toLocalIterator.asScala }) } else { - new ArrayFetchIterator[SparkRow](result.collect()) + new ArrayFetchIterator[Row](result.collect()) } dataTypes = result.schema.fields.map(_.dataType) } catch { @@ -373,17 +306,69 @@ private[hive] class SparkExecuteStatementOperation( } object SparkExecuteStatementOperation { - def getTableSchema(structType: StructType): TableSchema = { - val schema = structType.map { field => - val attrTypeString = field.dataType match { - case CalendarIntervalType => StringType.catalogString - case _: YearMonthIntervalType => "interval_year_month" - case _: DayTimeIntervalType => "interval_day_time" - case _: TimestampNTZType => "timestamp" - case other => other.catalogString - } - new FieldSchema(field.name, attrTypeString, field.getComment.getOrElse("")) + + def toTTypeId(typ: DataType): TTypeId = typ match { + case NullType => TTypeId.NULL_TYPE + case BooleanType => TTypeId.BOOLEAN_TYPE + case ByteType => TTypeId.TINYINT_TYPE + case ShortType => TTypeId.SMALLINT_TYPE + case IntegerType => TTypeId.INT_TYPE + case LongType => TTypeId.BIGINT_TYPE + case FloatType => TTypeId.FLOAT_TYPE + case DoubleType => TTypeId.DOUBLE_TYPE + case StringType => TTypeId.STRING_TYPE + case _: DecimalType => TTypeId.DECIMAL_TYPE + case DateType => TTypeId.DATE_TYPE + // TODO: Shall use TIMESTAMPLOCALTZ_TYPE, keep AS-IS now for + // unnecessary behavior change + case TimestampType => TTypeId.TIMESTAMP_TYPE + case TimestampNTZType => TTypeId.TIMESTAMP_TYPE + case BinaryType => TTypeId.BINARY_TYPE + case CalendarIntervalType => TTypeId.STRING_TYPE + case _: DayTimeIntervalType => TTypeId.INTERVAL_DAY_TIME_TYPE + case _: YearMonthIntervalType => TTypeId.INTERVAL_YEAR_MONTH_TYPE + case _: ArrayType => TTypeId.ARRAY_TYPE + case _: MapType => TTypeId.MAP_TYPE + case _: StructType => TTypeId.STRUCT_TYPE + case other => + throw new IllegalArgumentException(s"Unrecognized type name: ${other.catalogString}") + } + + private def toTTypeQualifiers(typ: DataType): TTypeQualifiers = { + val ret = new TTypeQualifiers() + val qualifiers = typ match { + case d: DecimalType => + Map( + TCLIServiceConstants.PRECISION -> TTypeQualifierValue.i32Value(d.precision), + TCLIServiceConstants.SCALE -> TTypeQualifierValue.i32Value(d.scale)).asJava + case _ => Collections.emptyMap[String, TTypeQualifierValue]() + } + ret.setQualifiers(qualifiers) + ret + } + + private def toTTypeDesc(typ: DataType): TTypeDesc = { + val typeEntry = new TPrimitiveTypeEntry(toTTypeId(typ)) + typeEntry.setTypeQualifiers(toTTypeQualifiers(typ)) + val tTypeDesc = new TTypeDesc() + tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry)) + tTypeDesc + } + + private def toTColumnDesc(field: StructField, pos: Int): TColumnDesc = { + val tColumnDesc = new TColumnDesc() + tColumnDesc.setColumnName(field.name) + tColumnDesc.setTypeDesc(toTTypeDesc(field.dataType)) + tColumnDesc.setComment(field.getComment().getOrElse("")) + tColumnDesc.setPosition(pos) + tColumnDesc + } + + def toTTableSchema(schema: StructType): TTableSchema = { + val tTableSchema = new TTableSchema() + schema.zipWithIndex.foreach { case (f, i) => + tTableSchema.addToColumns(toTColumnDesc(f, i)) } - new TableSchema(schema.asJava) + tTableSchema } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index bcb8ef0cdfb..d135fe7efbb 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -35,9 +35,10 @@ import com.google.common.io.Files import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver import org.apache.hive.service.auth.PlainSaslHelper -import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType, RowSet} +import org.apache.hive.service.cli.{CLIService, FetchOrientation, FetchType, GetInfoType, RowSetFactory} import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient import org.apache.hive.service.rpc.thrift.TCLIService.Client +import org.apache.hive.service.rpc.thrift.TRowSet import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll @@ -123,7 +124,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { 1000, FetchType.QUERY_OUTPUT) - rows_next.numRows() + RowSetFactory.create(rows_next, sessionHandle.getProtocolVersion).numRows() } // Fetch result second time from first row @@ -135,7 +136,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { 1000, FetchType.QUERY_OUTPUT) - rows_first.numRows() + RowSetFactory.create(rows_first, sessionHandle.getProtocolVersion).numRows() } } } @@ -749,10 +750,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { } test("ThriftCLIService FetchResults FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR") { - def checkResult(rows: RowSet, start: Long, end: Long): Unit = { - assert(rows.getStartOffset() == start) - assert(rows.numRows() == end - start) - rows.iterator.asScala.zip((start until end).iterator).foreach { case (row, v) => + def checkResult(rows: TRowSet, start: Long, end: Long): Unit = { + val rowSet = RowSetFactory.create(rows, CLIService.SERVER_VERSION) + assert(rowSet.getStartOffset == start) + assert(rowSet.numRows() == end - start) + rowSet.iterator.asScala.zip((start until end).iterator).foreach { case (row, v) => assert(row(0).asInstanceOf[Long] === v) } } @@ -766,7 +768,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { sessionHandle, "SELECT * FROM range(10)", confOverlay) // 10 rows result with sequence 0, 1, 2, ..., 9 - var rows: RowSet = null + var rows: TRowSet = null // Fetch 5 rows with FETCH_NEXT rows = client.fetchResults( @@ -868,7 +870,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test { FetchOrientation.FETCH_NEXT, 1000, FetchType.QUERY_OUTPUT) - rows_next.numRows() + RowSetFactory.create(rows_next, sessionHandle.getProtocolVersion).numRows() } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index c8bb6d9ee08..b61c91f3109 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hive.service.cli.OperationState import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl} -import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TTypeId} import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} import org.mockito.invocation.InvocationOnMock @@ -41,22 +41,28 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark val field1 = StructField("NULL", NullType) val field2 = StructField("(IF(true, NULL, NULL))", NullType) val tableSchema = StructType(Seq(field1, field2)) - val columns = SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors() - assert(columns.size() == 2) - assert(columns.get(0).getType().getName == "VOID") - assert(columns.get(1).getType().getName == "VOID") + val columns = SparkExecuteStatementOperation.toTTableSchema(tableSchema) + assert(columns.getColumnsSize == 2) + assert(columns.getColumns.get(0).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType + === TTypeId.NULL_TYPE) + assert(columns.getColumns.get(1).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType + === TTypeId.NULL_TYPE) } test("SPARK-20146 Comment should be preserved") { val field1 = StructField("column1", StringType).withComment("comment 1") val field2 = StructField("column2", IntegerType) val tableSchema = StructType(Seq(field1, field2)) - val columns = SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors() - assert(columns.size() == 2) - assert(columns.get(0).getType().getName == "STRING") - assert(columns.get(0).getComment() == "comment 1") - assert(columns.get(1).getType().getName == "INT") - assert(columns.get(1).getComment() == "") + val columns = SparkExecuteStatementOperation.toTTableSchema(tableSchema) + assert(columns.getColumnsSize == 2) + assert(columns.getColumns.get(0).getColumnName == "column1") + assert(columns.getColumns.get(0).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType + === TTypeId.STRING_TYPE) + assert(columns.getColumns.get(0).getComment == "comment 1") + assert(columns.getColumns.get(1).getColumnName == "column2") + assert(columns.getColumns.get(1).getTypeDesc.getTypes.get(0).getPrimitiveEntry.getType + === TTypeId.INT_TYPE) + assert(columns.getColumns.get(1).getComment == "") } Seq( diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index b5cfa04bab5..94d7318e620 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -128,14 +128,14 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val opHandle1 = exec("select current_user(), current_user") val rowSet1 = client.fetchResults(opHandle1) - rowSet1.toTRowSet.getColumns.forEach { col => + rowSet1.getColumns.forEach { col => assert(col.getStringVal.getValues.get(0) === clientUser) } exec(s"set ${SQLConf.ANSI_ENABLED.key}=true") exec(s"set ${SQLConf.ENFORCE_RESERVED_KEYWORDS.key}=true") val opHandle2 = exec("select current_user") - assert(client.fetchResults(opHandle2).toTRowSet.getColumns.get(0) + assert(client.fetchResults(opHandle2).getColumns.get(0) .getStringVal.getValues.get(0) === clientUser) val e = intercept[HiveSQLException](exec("select current_user()")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org