Repository: flink Updated Branches: refs/heads/master 51a5048b2 -> ecbccd940
[FLINK-4550] [table] Clearly define SQL operator table This closes #2502. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecbccd94 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecbccd94 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecbccd94 Branch: refs/heads/master Commit: ecbccd940d2df462215b7a79e895114b3d2df3cf Parents: 51a5048 Author: twalthr <twal...@apache.org> Authored: Thu Sep 15 16:56:16 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Fri Sep 23 16:47:59 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/TableEnvironment.scala | 2 +- .../api/table/validate/FunctionCatalog.scala | 123 +++++++++++++++++-- 2 files changed, 116 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index d7e650c..b95198c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -63,7 +63,7 @@ abstract class TableEnvironment(val config: TableConfig) { private val tables: SchemaPlus = Frameworks.createRootSchema(true) // Table API/SQL function catalog - private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns // SQL operator and function catalog private val sqlOperatorTable: SqlOperatorTable = functionCatalog.getSqlOperatorTable http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index 42d460e..9c66730 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -19,8 +19,8 @@ package org.apache.flink.api.table.validate import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable} -import org.apache.calcite.sql.{SqlFunction, SqlOperatorTable} +import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable} +import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} import org.apache.flink.api.table.ValidationException import org.apache.flink.api.table.expressions._ import org.apache.flink.api.table.functions.ScalarFunction @@ -31,7 +31,8 @@ import scala.collection.mutable import scala.util.{Failure, Success, Try} /** - * A catalog for looking up user-defined functions, used during validation phase. + * A catalog for looking up (user-defined) functions, used during validation phases + * of both Table API and SQL API. */ class FunctionCatalog { @@ -48,7 +49,7 @@ class FunctionCatalog { def getSqlOperatorTable: SqlOperatorTable = ChainedSqlOperatorTable.of( - SqlStdOperatorTable.instance(), + new BasicOperatorTable(), new ListSqlOperatorTable(sqlFunctions) ) @@ -116,7 +117,7 @@ class FunctionCatalog { object FunctionCatalog { - val buildInFunctions: Map[String, Class[_]] = Map( + val builtInFunctions: Map[String, Class[_]] = Map( // logic "isNull" -> classOf[IsNull], "isNotNull" -> classOf[IsNotNull], @@ -169,11 +170,117 @@ object FunctionCatalog { ) /** - * Create a new function catalog with build-in functions. + * Create a new function catalog with built-in functions. */ - def withBuildIns: FunctionCatalog = { + def withBuiltIns: FunctionCatalog = { val catalog = new FunctionCatalog() - buildInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) } + builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) } catalog } } + +class BasicOperatorTable extends ReflectiveSqlOperatorTable { + + /** + * List of supported SQL operators / functions. + * + * This list should be kept in sync with [[SqlStdOperatorTable]]. + */ + private val builtInSqlOperators: Seq[SqlOperator] = Seq( + // SET OPERATORS + SqlStdOperatorTable.UNION, + SqlStdOperatorTable.UNION_ALL, + SqlStdOperatorTable.EXCEPT, + SqlStdOperatorTable.EXCEPT_ALL, + SqlStdOperatorTable.INTERSECT, + SqlStdOperatorTable.INTERSECT_ALL, + // BINARY OPERATORS + SqlStdOperatorTable.AND, + SqlStdOperatorTable.AS, + SqlStdOperatorTable.CONCAT, + SqlStdOperatorTable.DIVIDE, + SqlStdOperatorTable.DIVIDE_INTEGER, + SqlStdOperatorTable.DOT, + SqlStdOperatorTable.EQUALS, + SqlStdOperatorTable.GREATER_THAN, + SqlStdOperatorTable.IS_DISTINCT_FROM, + SqlStdOperatorTable.IS_NOT_DISTINCT_FROM, + SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, + SqlStdOperatorTable.LESS_THAN, + SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + SqlStdOperatorTable.MINUS, + SqlStdOperatorTable.MULTIPLY, + SqlStdOperatorTable.NOT_EQUALS, + SqlStdOperatorTable.OR, + SqlStdOperatorTable.PLUS, + SqlStdOperatorTable.DATETIME_PLUS, + // POSTFIX OPERATORS + SqlStdOperatorTable.DESC, + SqlStdOperatorTable.NULLS_FIRST, + SqlStdOperatorTable.IS_NOT_NULL, + SqlStdOperatorTable.IS_NULL, + SqlStdOperatorTable.IS_NOT_TRUE, + SqlStdOperatorTable.IS_TRUE, + SqlStdOperatorTable.IS_NOT_FALSE, + SqlStdOperatorTable.IS_FALSE, + SqlStdOperatorTable.IS_NOT_UNKNOWN, + SqlStdOperatorTable.IS_UNKNOWN, + // PREFIX OPERATORS + SqlStdOperatorTable.NOT, + SqlStdOperatorTable.UNARY_MINUS, + SqlStdOperatorTable.UNARY_PLUS, + // AGGREGATE OPERATORS + SqlStdOperatorTable.SUM, + SqlStdOperatorTable.COUNT, + SqlStdOperatorTable.MIN, + SqlStdOperatorTable.MAX, + SqlStdOperatorTable.AVG, + // SPECIAL OPERATORS + SqlStdOperatorTable.ROW, + SqlStdOperatorTable.OVERLAPS, + SqlStdOperatorTable.LITERAL_CHAIN, + SqlStdOperatorTable.BETWEEN, + SqlStdOperatorTable.SYMMETRIC_BETWEEN, + SqlStdOperatorTable.NOT_BETWEEN, + SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN, + SqlStdOperatorTable.NOT_LIKE, + SqlStdOperatorTable.LIKE, + SqlStdOperatorTable.NOT_SIMILAR_TO, + SqlStdOperatorTable.SIMILAR_TO, + SqlStdOperatorTable.CASE, + SqlStdOperatorTable.REINTERPRET, + SqlStdOperatorTable.EXTRACT_DATE, + // FUNCTIONS + SqlStdOperatorTable.SUBSTRING, + SqlStdOperatorTable.OVERLAY, + SqlStdOperatorTable.TRIM, + SqlStdOperatorTable.POSITION, + SqlStdOperatorTable.CHAR_LENGTH, + SqlStdOperatorTable.CHARACTER_LENGTH, + SqlStdOperatorTable.UPPER, + SqlStdOperatorTable.LOWER, + SqlStdOperatorTable.INITCAP, + SqlStdOperatorTable.POWER, + SqlStdOperatorTable.SQRT, + SqlStdOperatorTable.MOD, + SqlStdOperatorTable.LN, + SqlStdOperatorTable.LOG10, + SqlStdOperatorTable.ABS, + SqlStdOperatorTable.EXP, + SqlStdOperatorTable.NULLIF, + SqlStdOperatorTable.COALESCE, + SqlStdOperatorTable.FLOOR, + SqlStdOperatorTable.CEIL, + SqlStdOperatorTable.LOCALTIME, + SqlStdOperatorTable.LOCALTIMESTAMP, + SqlStdOperatorTable.CURRENT_TIME, + SqlStdOperatorTable.CURRENT_TIMESTAMP, + SqlStdOperatorTable.CURRENT_DATE, + SqlStdOperatorTable.CAST, + SqlStdOperatorTable.EXTRACT, + SqlStdOperatorTable.QUARTER, + SqlStdOperatorTable.SCALAR_QUERY + ) + + builtInSqlOperators.foreach(register) +}