This is an automated email from the ASF dual-hosted git repository. mgaido pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new d0d8028 [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT][FOLLOWUP] Use ResultSet in catalog operations d0d8028 is described below commit d0d8028657c7314ad031f51c0a564a8786213187 Author: Marco Gaido <mga...@apache.org> AuthorDate: Fri Aug 30 14:06:08 2019 +0200 [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT][FOLLOWUP] Use ResultSet in catalog operations ## What changes were proposed in this pull request? This is a followup of #194 which addresses all the remaining concerns. The main changes are: - reverting the introduction of a state specific for catalog operations; - usage of `ResultSet` to send over the wire the data for catalog operations too. ## How was this patch tested? existing modified UTs Author: Marco Gaido <mga...@apache.org> Closes #217 from mgaido91/LIVY-622_followup. --- .../livy/thriftserver/LivyOperationManager.scala | 18 ++--- .../operation/GetColumnsOperation.scala | 20 +++--- .../operation/GetFunctionsOperation.scala | 35 ++------- .../operation/GetSchemasOperation.scala | 12 ++-- .../operation/GetTablesOperation.scala | 16 ++--- .../thriftserver/operation/MetadataOperation.scala | 3 - .../operation/SparkCatalogOperation.scala | 51 ++------------ .../session/CleanupCatalogResultJob.java | 37 ---------- .../session/FetchCatalogResultJob.java | 51 -------------- .../livy/thriftserver/session/GetColumnsJob.java | 31 ++++---- .../livy/thriftserver/session/GetFunctionsJob.java | 66 ++++++++++++++--- .../livy/thriftserver/session/GetSchemasJob.java | 24 ++++--- .../livy/thriftserver/session/GetTablesJob.java | 19 ++--- .../livy/thriftserver/session/SparkCatalogJob.java | 64 +++++++++++++++-- .../livy/thriftserver/session/SparkUtils.java | 46 ++++++++++++ .../thriftserver/session/ThriftSessionState.java | 32 --------- .../thriftserver/session/ThriftSessionTest.java | 82 +++++++++++++--------- 17 files changed, 290 insertions(+), 317 deletions(-) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala index 2454185..eb1dd21 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala @@ -196,12 +196,7 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage tableTypes: util.List[String]): OperationHandle = { executeOperation(sessionHandle, { val op = new GetTablesOperation( - sessionHandle, - catalogName, - schemaName, - tableName, - tableTypes, - livyThriftSessionManager) + sessionHandle, schemaName, tableName, tableTypes, livyThriftSessionManager) addOperation(op, sessionHandle) op }) @@ -214,8 +209,8 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage schemaName: String, functionName: String): OperationHandle = { executeOperation(sessionHandle, { - val op = new GetFunctionsOperation(sessionHandle, catalogName, schemaName, functionName, - livyThriftSessionManager) + val op = new GetFunctionsOperation( + sessionHandle, schemaName, functionName, livyThriftSessionManager) addOperation(op, sessionHandle) op }) @@ -227,8 +222,7 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage catalogName: String, schemaName: String): OperationHandle = { executeOperation(sessionHandle, { - val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName, - livyThriftSessionManager) + val op = new GetSchemasOperation(sessionHandle, schemaName, livyThriftSessionManager) addOperation(op, sessionHandle) op }) @@ -242,8 +236,8 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage tableName: String, columnName: String): OperationHandle = { executeOperation(sessionHandle, { - val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, tableName, - columnName, livyThriftSessionManager) + val op = new GetColumnsOperation( + sessionHandle, schemaName, tableName, columnName, livyThriftSessionManager) addOperation(op, sessionHandle) op }) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala index c9c106c..ae62679 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala @@ -26,10 +26,9 @@ import org.apache.livy.thriftserver.session.{GetColumnsJob, GetFunctionsJob} class GetColumnsOperation( sessionHandle: SessionHandle, - catalogName: String, - schemaName: String, - tableName: String, - columnName: String, + schemaPattern: String, + tablePattern: String, + columnPattern: String, sessionManager: LivyThriftSessionManager) extends SparkCatalogOperation( sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging { @@ -39,12 +38,12 @@ class GetColumnsOperation( setState(OperationState.RUNNING) try { rscClient.submit(new GetColumnsJob( - convertSchemaPattern(schemaName), - convertIdentifierPattern(tableName, datanucleusFormat = true), - Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat = false) }.orNull, + schemaPattern, + tablePattern, + columnPattern, sessionId, - jobId - )).get() + jobId, + GetColumnsOperation.SCHEMA.fields.map(_.fieldType.dataType))).get() setState(OperationState.FINISHED) } catch { @@ -97,6 +96,5 @@ object GetColumnsOperation { "user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't " + "DISTINCT or user-generated REF)"), Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether this column is " + - "auto incremented.") - ) + "auto incremented.")) } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala index 0e43f16..4648f40 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala @@ -26,9 +26,8 @@ import org.apache.livy.thriftserver.LivyThriftSessionManager class GetFunctionsOperation( sessionHandle: SessionHandle, - catalogName: String, - schemaName: String, - functionName: String, + schemaPattern: String, + functionSQLSearch: String, sessionManager: LivyThriftSessionManager) extends SparkCatalogOperation( sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging { @@ -38,11 +37,11 @@ class GetFunctionsOperation( setState(OperationState.RUNNING) try { rscClient.submit(new GetFunctionsJob( - convertSchemaPattern(schemaName), - convertFunctionName(functionName), + schemaPattern, + functionSQLSearch, sessionId, - jobId - )).get() + jobId, + GetFunctionsOperation.SCHEMA.fields.map(_.fieldType.dataType))).get() setState(OperationState.FINISHED) } catch { @@ -58,25 +57,6 @@ class GetFunctionsOperation( assertState(Seq(OperationState.FINISHED)) GetFunctionsOperation.SCHEMA } - - private def convertFunctionName(name: String): String = { - if (name == null) { - ".*" - } else { - var escape = false - name.flatMap { - case c if escape => - if (c != '\\') escape = false - c.toString - case '\\' => - escape = true - "" - case '%' => ".*" - case '_' => "." - case c => Character.toLowerCase(c).toString - } - } - } } object GetFunctionsOperation { @@ -89,6 +69,5 @@ object GetFunctionsOperation { Field("FUNCTION_TYPE", BasicDataType("integer"), "Kind of function."), Field("SPECIFIC_NAME", BasicDataType("string"), - "The name which uniquely identifies this function within its schema") - ) + "The name which uniquely identifies this function within its schema")) } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala index 6bd0a17..95fb11a 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala @@ -26,8 +26,7 @@ import org.apache.livy.thriftserver.session.{GetSchemasJob, GetTablesJob} class GetSchemasOperation( sessionHandle: SessionHandle, - catalogName: String, - schemaName: String, + schemaPattern: String, sessionManager: LivyThriftSessionManager) extends SparkCatalogOperation( sessionHandle, OperationType.GET_SCHEMAS, sessionManager) with Logging { @@ -37,8 +36,10 @@ class GetSchemasOperation( setState(OperationState.RUNNING) try { rscClient.submit(new GetSchemasJob( - convertSchemaPattern(schemaName), sessionId, jobId - )).get() + schemaPattern, + sessionId, + jobId, + GetSchemasOperation.SCHEMA.fields.map(_.fieldType.dataType))).get() setState(OperationState.FINISHED) } catch { case e: Throwable => @@ -58,6 +59,5 @@ class GetSchemasOperation( object GetSchemasOperation { val SCHEMA = Schema( Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."), - Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.") - ) + Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.")) } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala index 4a939b3..3492724 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala @@ -26,9 +26,8 @@ import org.apache.livy.thriftserver.session.GetTablesJob class GetTablesOperation( sessionHandle: SessionHandle, - catalogName: String, - schemaName: String, - tableName: String, + schemaPattern: String, + tablePattern: String, tableTypes: java.util.List[String], sessionManager: LivyThriftSessionManager) extends SparkCatalogOperation( @@ -39,12 +38,12 @@ class GetTablesOperation( setState(OperationState.RUNNING) try { rscClient.submit(new GetTablesJob( - convertSchemaPattern(schemaName), - convertIdentifierPattern(tableName, datanucleusFormat = true), + schemaPattern, + tablePattern, tableTypes, sessionId, - jobId - )).get() + jobId, + GetTablesOperation.SCHEMA.fields.map(_.fieldType.dataType))).get() setState(OperationState.FINISHED) } catch { @@ -68,6 +67,5 @@ object GetTablesOperation { Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."), Field("TABLE_NAME", BasicDataType("string"), "Table name."), Field("TABLE_TYPE", BasicDataType("string"), "The table type, e.g. \"TABLE\", \"VIEW\", etc."), - Field("REMARKS", BasicDataType("string"), "Comments about the table.") - ) + Field("REMARKS", BasicDataType("string"), "Comments about the table.")) } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala index 48c17cc..6f70f02 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala @@ -23,9 +23,6 @@ import org.apache.livy.thriftserver.serde.ThriftResultSet /** * MetadataOperation is the base class for operations which do not perform any call on Spark side - * - * @param sessionHandle - * @param opType */ abstract class MetadataOperation(sessionHandle: SessionHandle, opType: OperationType) extends Operation(sessionHandle, opType) { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala index 9ed31f7..2e2ecc5 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala @@ -17,12 +17,11 @@ package org.apache.livy.thriftserver.operation -import org.apache.commons.lang.StringUtils import org.apache.hive.service.cli._ import org.apache.livy.thriftserver.LivyThriftSessionManager import org.apache.livy.thriftserver.serde.ThriftResultSet -import org.apache.livy.thriftserver.session.{CleanupCatalogResultJob, FetchCatalogResultJob} +import org.apache.livy.thriftserver.session.{CleanupStatementJob, FetchResultJob} /** * SparkCatalogOperation is the base class for operations which need to fetch catalog information @@ -51,7 +50,7 @@ abstract class SparkCatalogOperation( @throws[HiveSQLException] override def close(): Unit = { - val cleaned = rscClient.submit(new CleanupCatalogResultJob(sessionId, jobId)).get() + val cleaned = rscClient.submit(new CleanupStatementJob(sessionId, jobId)).get() if (!cleaned) { warn(s"Fail to cleanup fetch catalog job (session = ${sessionId}), " + "this message can be ignored if the job failed.") @@ -66,54 +65,12 @@ abstract class SparkCatalogOperation( throw new UnsupportedOperationException("SparkCatalogOperation.cancel()") } - /** - * Convert wildchars and escape sequence from JDBC format to datanucleous/regex - * - * This is ported from Spark Hive Thrift MetaOperation. - */ - protected def convertIdentifierPattern(pattern: String, datanucleusFormat: Boolean): String = { - if (pattern == null) { - convertPattern("%", datanucleusFormat = true) - } else { - convertPattern(pattern, datanucleusFormat) - } - } - - /** - * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex - * The schema pattern treats empty string also as wildchar. - * - * This is ported from Spark Hive Thrift MetaOperation. - */ - protected def convertSchemaPattern(pattern: String): String = { - if (StringUtils.isEmpty(pattern)) { - convertPattern("%", datanucleusFormat = true) - } else { - convertPattern(pattern, datanucleusFormat = true) - } - } - - private def convertPattern(pattern: String, datanucleusFormat: Boolean): String = { - val wStr = if (datanucleusFormat) "*" else ".*" - pattern - .replaceAll("([^\\\\])%", "$1" + wStr) - .replaceAll("\\\\%", "%") - .replaceAll("^%", wStr) - .replaceAll("([^\\\\])_", "$1.") - .replaceAll("\\\\_", "_") - .replaceAll("^_", ".") - } - override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): ThriftResultSet = { validateFetchOrientation(orientation) assertState(Seq(OperationState.FINISHED)) setHasResultSet(true) val maxRows = maxRowsL.toInt - val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, maxRows)).get() - - val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion) - import scala.collection.JavaConverters._ - results.asScala.foreach { r => rowSet.addRow(r.asInstanceOf[Array[Any]]) } - return rowSet + val results = rscClient.submit(new FetchResultJob(sessionId, jobId, maxRows)).get() + ThriftResultSet(results) } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java deleted file mode 100644 index b9444ca..0000000 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.thriftserver.session; - -import org.apache.livy.Job; -import org.apache.livy.JobContext; - -public class CleanupCatalogResultJob implements Job<Boolean> { - private final String sessionId; - private final String jobId; - - public CleanupCatalogResultJob(String sessionId, String jobId) { - this.sessionId = sessionId; - this.jobId = jobId; - } - - @Override - public Boolean call(JobContext jc) throws Exception { - ThriftSessionState session = ThriftSessionState.get(jc, sessionId); - return session.cleanupCatalogJob(jobId); - } -} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java deleted file mode 100644 index 9654b02..0000000 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.thriftserver.session; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.livy.Job; -import org.apache.livy.JobContext; - -public class FetchCatalogResultJob implements Job<List<Object[]>> { - private final String sessionId; - private final String jobId; - private final int maxRows; - - public FetchCatalogResultJob(String sessionId, String jobId, int maxRows) { - this.sessionId = sessionId; - this.jobId = jobId; - this.maxRows = maxRows; - } - - @Override - public List<Object[]> call(JobContext jc) throws Exception { - ThriftSessionState session = ThriftSessionState.get(jc, sessionId); - Iterator<Object[]> iterator = session.findCatalogJob(jobId).iter; - - List<Object[]> result = new ArrayList<>(); - int n = 0; - while (iterator.hasNext() && n < maxRows) { - result.add(iterator.next()); - n += 1; - } - return result; - } -} diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java index bc2bd73..b5a0cba 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java @@ -22,9 +22,11 @@ import java.util.List; import static scala.collection.JavaConversions.seqAsJavaList; +import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.types.StructField; public class GetColumnsJob extends SparkCatalogJob { @@ -33,20 +35,21 @@ public class GetColumnsJob extends SparkCatalogJob { private final String columnPattern; public GetColumnsJob( - String databasePattern, - String tablePattern, - String columnPattern, - String sessionId, - String jobId) { - super(sessionId, jobId); - this.databasePattern = databasePattern; - this.tablePattern = tablePattern; - this.columnPattern = columnPattern; + String databasePattern, + String tablePattern, + String columnPattern, + String sessionId, + String jobId, + DataType[] resultTypes) { + super(sessionId, jobId, resultTypes); + this.databasePattern = convertSchemaPattern(databasePattern); + this.tablePattern = convertIdentifierPattern(tablePattern, true); + this.columnPattern = convertIdentifierPattern(columnPattern, false); } @Override - protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { - List<Object[]> columnList = new ArrayList<Object[]>(); + protected List<Row> fetchCatalogObjects(SessionCatalog catalog) { + List<Row> columnList = new ArrayList<>(); List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); for (String db : databases) { @@ -57,8 +60,8 @@ public class GetColumnsJob extends SparkCatalogJob { List<StructField> fields = seqAsJavaList(table.schema()); int position = 0; for (StructField field : fields) { - if (columnPattern == null || field.name().matches(columnPattern)) { - columnList.add(new Object[] { + if (field.name().matches(columnPattern)) { + columnList.add(new GenericRow(new Object[] { DEFAULT_HIVE_CATALOG, table.database(), table.identifier().table(), @@ -82,7 +85,7 @@ public class GetColumnsJob extends SparkCatalogJob { null, // SCOPE_TABLE null, // SOURCE_DATA_TYPE "NO" // IS_AUTO_INCREMENT - }); + })); position += 1; } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java index 297fb80..e5e383a 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java @@ -23,45 +23,89 @@ import java.util.List; import scala.Tuple2; import static scala.collection.JavaConversions.seqAsJavaList; +import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.FunctionIdentifier; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; import org.apache.spark.sql.catalyst.expressions.ExpressionInfo; +import org.apache.spark.sql.catalyst.expressions.GenericRow; public class GetFunctionsJob extends SparkCatalogJob { + private static final char SEARCH_STRING_ESCAPE = '\\'; private final String databasePattern; - private final String functionName; + private final String functionRegex; public GetFunctionsJob( String databasePattern, - String functionName, + String functionSQLSearch, String sessionId, - String jobId) { - super(sessionId, jobId); - this.databasePattern = databasePattern; - this.functionName = functionName; + String jobId, + DataType[] resultTypes) { + super(sessionId, jobId, resultTypes); + // Spark is using regex to filter the function name, instead of SQL search patterns, + // so we need to convert them + this.databasePattern = convertSchemaPattern(databasePattern); + this.functionRegex = patternToRegex(functionSQLSearch); } @Override - protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { - List<Object[]> funcList = new ArrayList<Object[]>(); + protected List<Row> fetchCatalogObjects(SessionCatalog catalog) { + List<Row> funcList = new ArrayList<>(); List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); for (String db : databases) { List<Tuple2<FunctionIdentifier, String>> identifiersTypes = - seqAsJavaList(catalog.listFunctions(db, functionName)); + seqAsJavaList(catalog.listFunctions(db, functionRegex)); for (Tuple2<FunctionIdentifier, String> identifierType : identifiersTypes) { FunctionIdentifier function = identifierType._1; ExpressionInfo info = catalog.lookupFunctionInfo(function); - funcList.add(new Object[] { + funcList.add(new GenericRow(new Object[] { null, function.database().isDefined() ? function.database().get() : null, function.funcName(), info.getUsage() + info.getExtended(), null, info.getClassName() - }); + })); } } return funcList; } + + /** + * Ported from Spark's CLIServiceUtils. + * Converts a SQL search pattern into an equivalent Java Regex. + * + * @param pattern input which may contain '%' or '_' wildcard characters, or + * these characters escaped using { @code getSearchStringEscape()}. + * @return replace %/_ with regex search characters, also handle escaped characters. + */ + private String patternToRegex(String pattern) { + if (pattern == null) { + return ".*"; + } else { + StringBuilder result = new StringBuilder(pattern.length()); + + boolean escaped = false; + for (int i = 0, len = pattern.length(); i < len; i++) { + char c = pattern.charAt(i); + if (escaped) { + if (c != SEARCH_STRING_ESCAPE) { + escaped = false; + } + result.append(c); + } else { + if (c == SEARCH_STRING_ESCAPE) { + escaped = true; + } else if (c == '%') { + result.append(".*"); + } else if (c == '_') { + result.append('.'); + } else { + result.append(Character.toLowerCase(c)); + } + } + } + return result.toString(); + } + } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java index 451bea2..59c4cca 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java @@ -22,25 +22,31 @@ import java.util.List; import static scala.collection.JavaConversions.seqAsJavaList; +import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.GenericRow; public class GetSchemasJob extends SparkCatalogJob { - private final String schemaName; + private final String schemaPattern; - public GetSchemasJob(String schemaName, String sessionId, String jobId) { - super(sessionId, jobId); - this.schemaName = schemaName; + public GetSchemasJob( + String schemaPattern, + String sessionId, + String jobId, + DataType[] resultTypes) { + super(sessionId, jobId, resultTypes); + this.schemaPattern = convertSchemaPattern(schemaPattern); } @Override - protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { - List<String> databases = seqAsJavaList(catalog.listDatabases(schemaName)); - List<Object[]> schemas = new ArrayList<>(); + protected List<Row> fetchCatalogObjects(SessionCatalog catalog) { + List<String> databases = seqAsJavaList(catalog.listDatabases(schemaPattern)); + List<Row> schemas = new ArrayList<>(); for (String db : databases) { - schemas.add(new Object[] { + schemas.add(new GenericRow(new Object[] { db, DEFAULT_HIVE_CATALOG, - }); + })); } return schemas; } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java index a071aef..d3c6b53 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java @@ -22,10 +22,12 @@ import java.util.List; import static scala.collection.JavaConversions.seqAsJavaList; +import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.catalyst.catalog.CatalogTableType; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.catalyst.expressions.GenericRow; public class GetTablesJob extends SparkCatalogJob { private final String databasePattern; @@ -37,10 +39,11 @@ public class GetTablesJob extends SparkCatalogJob { String tablePattern, List<String> tableTypes, String sessionId, - String jobId) { - super(sessionId, jobId); - this.databasePattern = databasePattern; - this.tablePattern = tablePattern; + String jobId, + DataType[] resultTypes) { + super(sessionId, jobId, resultTypes); + this.databasePattern = convertSchemaPattern(databasePattern); + this.tablePattern = convertIdentifierPattern(tablePattern, true); if (tableTypes != null) { for (String type : tableTypes) { this.tableTypes.add(type.toUpperCase()); @@ -49,8 +52,8 @@ public class GetTablesJob extends SparkCatalogJob { } @Override - protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) { - List<Object[]> tableList = new ArrayList<Object[]>(); + protected List<Row> fetchCatalogObjects(SessionCatalog catalog) { + List<Row> tableList = new ArrayList<Row>(); List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern)); for (String db : databases) { List<TableIdentifier> tableIdentifiers = @@ -59,14 +62,14 @@ public class GetTablesJob extends SparkCatalogJob { CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier); String type = convertTableType(table.tableType().name()); if (tableTypes.isEmpty() || tableTypes.contains(type)) { - tableList.add( + tableList.add(new GenericRow( new Object[] { DEFAULT_HIVE_CATALOG, table.database(), table.identifier().table(), type, table.comment().isDefined() ? table.comment().get() : "" - }); + })); } } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java index e86721e..afbdf32 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java @@ -19,6 +19,7 @@ package org.apache.livy.thriftserver.session; import java.util.List; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; @@ -30,21 +31,76 @@ public abstract class SparkCatalogJob implements Job<Void> { private final String sessionId; private final String jobId; + private final DataType[] resultTypes; - public SparkCatalogJob(String sessionId, String jobId) { + public SparkCatalogJob(String sessionId, String jobId, DataType[] resultTypes) { this.sessionId = sessionId; this.jobId = jobId; + this.resultTypes = resultTypes; } - protected abstract List<Object[]> fetchCatalogObjects(SessionCatalog catalog); + protected abstract List<Row> fetchCatalogObjects(SessionCatalog catalog); @Override public Void call(JobContext jc) throws Exception { SessionCatalog catalog = ((SparkSession)jc.sparkSession()).sessionState().catalog(); - List<Object[]> objects = fetchCatalogObjects(catalog); + List<Row> objects = fetchCatalogObjects(catalog); ThriftSessionState session = ThriftSessionState.get(jc, sessionId); - session.registerCatalogJob(jobId, objects.iterator()); + session.registerStatement( + jobId, SparkUtils.dataTypesToSchema(resultTypes), objects.iterator()); return null; } + + /** + * Convert wildchars and escape sequence from JDBC format to datanucleous/regex. + * + * This is ported from Spark Hive Thrift MetaOperation. + */ + protected String convertIdentifierPattern(final String pattern, boolean datanucleusFormat) { + if (pattern == null) { + return convertPattern("%", true); + } else { + return convertPattern(pattern, datanucleusFormat); + } + } + + /** + * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex + * The schema pattern treats empty string also as wildchar. + * + * This is ported from Spark Hive Thrift MetaOperation. + */ + protected String convertSchemaPattern(final String pattern) { + if ((pattern == null) || pattern.isEmpty()) { + return convertPattern("%", true); + } else { + return convertPattern(pattern, true); + } + } + + /** + * Convert a pattern containing JDBC catalog search wildcards into + * Java regex patterns. + * + * @param pattern input which may contain '%' or '_' wildcard characters. + * @return replace %/_ with regex search characters, also handle escaped characters. + * + * The datanucleus module expects the wildchar as '*'. The columns search on the + * other hand is done locally inside the hive code and that requires the regex wildchar + * format '.*' This is driven by the datanucleusFormat flag. + * + * This is ported from Spark Hive Thrift MetaOperation. + */ + private String convertPattern(final String pattern, boolean datanucleusFormat) { + String wStr; + if (datanucleusFormat) { + wStr = "*"; + } else { + wStr = ".*"; + } + return pattern + .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr) + .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", "."); + } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java index fac79ad..a91cf35 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java @@ -60,6 +60,52 @@ final class SparkUtils { } /** + * Translates our Thrift data types to Spark types + */ + public static StructType dataTypesToSchema(DataType[] types) { + StructField[] fields = new StructField[types.length]; + int idx = 0; + for (DataType dt : types) { + org.apache.spark.sql.types.DataType sparkDt = null; + switch (dt) { + case BOOLEAN: + sparkDt = DataTypes.BooleanType; + break; + case BYTE: + sparkDt = DataTypes.ByteType; + break; + case SHORT: + sparkDt = DataTypes.ShortType; + break; + case INTEGER: + sparkDt = DataTypes.IntegerType; + break; + case LONG: + sparkDt = DataTypes.LongType; + break; + case FLOAT: + sparkDt = DataTypes.FloatType; + break; + case DOUBLE: + sparkDt = DataTypes.DoubleType; + break; + case BINARY: + sparkDt = DataTypes.BinaryType; + break; + case STRING: + sparkDt = DataTypes.StringType; + break; + default: + throw new IllegalArgumentException("Invalid data type: " + dt); + } + fields[idx] = new StructField( + "col_" + idx, sparkDt, true, Metadata.empty()); + idx++; + } + return new StructType(fields); + } + + /** * This method is ported from Spark Hive Thrift server Type class * @param type * @return diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java index 9111d94..571c2e1 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java @@ -77,7 +77,6 @@ class ThriftSessionState { this.sessionId = sessionId; this.statements = new ConcurrentHashMap<>(); this.spark = ctx.<SparkSession>sparkSession().newSession(); - this.catalogJobStates = new ConcurrentHashMap<>(); } SparkSession spark() { @@ -126,35 +125,4 @@ class ThriftSessionState { throw new IllegalArgumentException(err); } } - - - private final ConcurrentMap<String, CatalogJobState> catalogJobStates; - - void registerCatalogJob(String jobId, Iterator<Object[]> results) { - checkNotNull(jobId, "No catalog job ID."); - CatalogJobState state = new CatalogJobState(results); - if (catalogJobStates.putIfAbsent(jobId, state) != null) { - throw new IllegalStateException( - String.format("Catalog job %s already registered.", jobId)); - } - } - - CatalogJobState findCatalogJob(String jobId) { - checkNotNull(jobId, "No catalog job ID."); - CatalogJobState state = catalogJobStates.get(jobId); - if (state == null) { - throw catalogJobNotFound(jobId); - } - return state; - } - - boolean cleanupCatalogJob(String jobId) { - checkNotNull(jobId, "No catalog job ID."); - return catalogJobStates.remove(jobId) != null; - } - - private NoSuchElementException catalogJobNotFound(String jobId) { - return new NoSuchElementException( - String.format("Catalog job %s not found in session %s.", jobId, sessionId)); - } } diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java index addc630..26c5c5a 100644 --- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java +++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java @@ -132,49 +132,61 @@ public class ThriftSessionTest { String s3 = nextSession(); waitFor(new RegisterSessionJob(s3)); String getSchemaJobId = "test_get_schema_job"; - waitFor(new GetSchemasJob("default", s3, getSchemaJobId)); - List<Object[]> schemas = waitFor( - new FetchCatalogResultJob(s3, getSchemaJobId, Integer.MAX_VALUE)); - assertEquals(1, schemas.size()); - assertEquals("default", schemas.get(0)[0]); - assertTrue(waitFor(new CleanupCatalogResultJob(s3, getSchemaJobId))); + DataType[] resultTypes = new DataType[] { DataType.STRING, DataType.STRING }; + waitFor(new GetSchemasJob("default", s3, getSchemaJobId, resultTypes)); + rs = waitFor(new FetchResultJob(s3, getSchemaJobId, Integer.MAX_VALUE)); + cols = rs.getColumns(); + assertEquals(1, cols[0].size()); + assertEquals("default", cols[0].get(0)); + assertTrue(waitFor(new CleanupStatementJob(s3, getSchemaJobId))); String getTablesJobId = "test_get_tables_job"; List<String> testTableTypes = new ArrayList<>(); testTableTypes.add("Table"); - waitFor(new GetTablesJob("default", "*", - testTableTypes, s3, getTablesJobId)); - List<Object[]> tables = waitFor( - new FetchCatalogResultJob(s3, getTablesJobId, Integer.MAX_VALUE)); - assertEquals(1, tables.size()); - assertEquals("default", tables.get(0)[1]); - assertEquals("test", tables.get(0)[2]); - assertTrue(waitFor(new CleanupCatalogResultJob(s3, getTablesJobId))); + resultTypes = new DataType[] { + DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING }; + waitFor(new GetTablesJob( + "default", "*", testTableTypes, s3, getTablesJobId, resultTypes)); + rs = waitFor(new FetchResultJob(s3, getTablesJobId, Integer.MAX_VALUE)); + cols = rs.getColumns(); + assertEquals(1, cols[0].size()); + assertEquals("default", cols[1].get(0)); + assertEquals("test", cols[2].get(0)); + assertTrue(waitFor(new CleanupStatementJob(s3, getTablesJobId))); String getColumnsJobId = "test_get_columns_job"; - waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId)); - List<Object[]> columns = waitFor( - new FetchCatalogResultJob(s3, getColumnsJobId, Integer.MAX_VALUE)); - assertEquals(2, columns.size()); - assertEquals("default", columns.get(0)[1]); - assertEquals("test", columns.get(0)[2]); - assertEquals("id", columns.get(0)[3]); - assertEquals("integer", columns.get(0)[5]); - assertEquals("default", columns.get(1)[1]); - assertEquals("test", columns.get(1)[2]); - assertEquals("desc", columns.get(1)[3]); - assertEquals("string", columns.get(1)[5]); - assertTrue(waitFor(new CleanupCatalogResultJob(s3, getColumnsJobId))); + resultTypes = new DataType[] { + DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, DataType.INTEGER, + DataType.STRING, DataType.INTEGER, DataType.BYTE, DataType.INTEGER, DataType.INTEGER, + DataType.INTEGER, DataType.STRING, DataType.STRING, DataType.INTEGER, DataType.INTEGER, + DataType.INTEGER, DataType.INTEGER, DataType.STRING, DataType.STRING, DataType.STRING, + DataType.STRING, DataType.SHORT, DataType.STRING }; + waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId, resultTypes)); + rs = waitFor(new FetchResultJob(s3, getColumnsJobId, Integer.MAX_VALUE)); + cols = rs.getColumns(); + assertEquals(2, cols[0].size()); + assertEquals("default", cols[1].get(0)); + assertEquals("test", cols[2].get(0)); + assertEquals("id", cols[3].get(0)); + assertEquals("integer", cols[5].get(0)); + assertEquals("default", cols[1].get(1)); + assertEquals("test", cols[2].get(1)); + assertEquals("desc", cols[3].get(1)); + assertEquals("string", cols[5].get(1)); + assertTrue(waitFor(new CleanupStatementJob(s3, getColumnsJobId))); String getFunctionsJobId = "test_get_functions_job"; - waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, getFunctionsJobId)); - List<Object[]> functions = waitFor( - new FetchCatalogResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE)); - assertEquals(1, functions.size()); - assertNull(functions.get(0)[1]); - assertEquals("unix_timestamp", functions.get(0)[2]); - assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", functions.get(0)[5]); - assertTrue(waitFor(new CleanupCatalogResultJob(s3, getFunctionsJobId))); + resultTypes = new DataType[] { + DataType.STRING, DataType.STRING, DataType.STRING, DataType.STRING, DataType.INTEGER, + DataType.STRING }; + waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, getFunctionsJobId, resultTypes)); + rs = waitFor(new FetchResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE)); + cols = rs.getColumns(); + assertEquals(1, cols[0].size()); + assertNull(cols[1].get(0)); + assertEquals("unix_timestamp", cols[2].get(0)); + assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", cols[5].get(0)); + assertTrue(waitFor(new CleanupStatementJob(s3, getFunctionsJobId))); // Tear down the session. waitFor(new UnregisterSessionJob(s3));