[FLINK-3754] [tableAPI] Add validation phase to Table API before construction of RelNodes.
This closes #1958 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0d543f8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0d543f8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0d543f8 Branch: refs/heads/master Commit: f0d543f8cf95efecef88c77155456104a6d742b9 Parents: f2e6057 Author: Yijie Shen <henry.yijies...@gmail.com> Authored: Wed Apr 13 16:46:58 2016 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed May 18 00:12:22 2016 +0200 ---------------------------------------------------------------------- .../flink/api/scala/table/expressionDsl.scala | 74 ++-- .../flink/api/table/BatchTableEnvironment.scala | 34 +- .../api/table/ExpressionParserException.scala | 23 -- .../flink/api/table/FlinkPlannerImpl.scala | 4 +- .../api/table/StreamTableEnvironment.scala | 17 +- .../flink/api/table/TableEnvironment.scala | 43 ++- .../apache/flink/api/table/TableException.scala | 23 -- .../org/apache/flink/api/table/exceptions.scala | 39 +++ .../api/table/expressions/Expression.scala | 64 ++-- .../table/expressions/ExpressionParser.scala | 42 +-- .../api/table/expressions/InputTypeSpec.scala | 55 +++ .../flink/api/table/expressions/TreeNode.scala | 120 ------- .../api/table/expressions/aggregations.scala | 33 +- .../api/table/expressions/arithmetic.scala | 44 ++- .../flink/api/table/expressions/call.scala | 75 +--- .../flink/api/table/expressions/cast.scala | 21 +- .../api/table/expressions/comparison.scala | 41 ++- .../api/table/expressions/fieldExpression.scala | 73 +++- .../flink/api/table/expressions/literals.scala | 14 +- .../flink/api/table/expressions/logic.scala | 50 ++- .../api/table/expressions/mathExpressions.scala | 115 +++++++ .../flink/api/table/expressions/ordering.scala | 21 +- .../table/expressions/stringExpressions.scala | 220 ++++++++++++ .../api/table/plan/RexNodeTranslator.scala | 50 ++- .../api/table/plan/logical/LogicalNode.scala | 162 +++++++++ .../api/table/plan/logical/operators.scala | 339 +++++++++++++++++++ .../org/apache/flink/api/table/table.scala | 296 +++------------- .../apache/flink/api/table/trees/TreeNode.scala | 114 +++++++ .../api/table/typeutils/TypeCheckUtils.scala | 40 +++ .../api/table/typeutils/TypeCoercion.scala | 92 +++++ .../table/validate/ExprValidationResult.scala | 41 +++ .../api/table/validate/FunctionCatalog.scala | 124 +++++++ .../api/java/batch/TableEnvironmentITCase.java | 11 +- .../java/batch/table/AggregationsITCase.java | 8 +- .../api/java/batch/table/ExpressionsITCase.java | 4 +- .../api/java/batch/table/FilterITCase.java | 3 +- .../batch/table/GroupedAggregationsITCase.java | 5 +- .../flink/api/java/batch/table/JoinITCase.java | 10 +- .../api/java/batch/table/SelectITCase.java | 5 +- .../batch/table/StringExpressionsITCase.java | 32 +- .../flink/api/java/batch/table/UnionITCase.java | 10 +- .../scala/batch/TableEnvironmentITCase.scala | 6 +- .../scala/batch/table/AggregationsITCase.scala | 9 +- .../scala/batch/table/ExpressionsITCase.scala | 4 +- .../api/scala/batch/table/FilterITCase.scala | 4 +- .../batch/table/GroupedAggregationsITCase.scala | 6 +- .../api/scala/batch/table/JoinITCase.scala | 10 +- .../api/scala/batch/table/SelectITCase.scala | 8 +- .../batch/table/StringExpressionsITCase.scala | 7 +- .../api/scala/batch/table/UnionITCase.scala | 8 +- .../expression/utils/ExpressionEvaluator.scala | 10 +- .../api/scala/stream/table/UnionITCase.scala | 10 +- .../scala/stream/table/UnsupportedOpsTest.scala | 7 + 53 files changed, 1915 insertions(+), 765 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 0f0b93c..11fb64a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -17,11 +17,11 @@ */ package org.apache.flink.api.scala.table +import scala.language.implicitConversions + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.expressions._ -import scala.language.implicitConversions - /** * These are all the operations that can be used to construct an [[Expression]] AST for expression * operations. @@ -63,7 +63,7 @@ trait ImplicitExpressionOperations { def cast(toType: TypeInformation[_]) = Cast(expr, toType) - def as(name: Symbol) = Naming(expr, name.name) + def as(name: Symbol) = Alias(expr, name.name) def asc = Asc(expr) def desc = Desc(expr) @@ -91,37 +91,37 @@ trait ImplicitExpressionOperations { /** * Calculates the Euler's number raised to the given power. */ - def exp() = Call(BuiltInFunctionNames.EXP, expr) + def exp() = Exp(expr) /** * Calculates the base 10 logarithm of given value. */ - def log10() = Call(BuiltInFunctionNames.LOG10, expr) + def log10() = Log10(expr) /** * Calculates the natural logarithm of given value. */ - def ln() = Call(BuiltInFunctionNames.LN, expr) + def ln() = Ln(expr) /** * Calculates the given number raised to the power of the other value. */ - def power(other: Expression) = Call(BuiltInFunctionNames.POWER, expr, other) + def power(other: Expression) = Power(expr, other) /** * Calculates the absolute value of given one. */ - def abs() = Call(BuiltInFunctionNames.ABS, expr) + def abs() = Abs(expr) /** * Calculates the largest integer less than or equal to a given number. */ - def floor() = Call(BuiltInFunctionNames.FLOOR, expr) + def floor() = Floor(expr) /** * Calculates the smallest integer greater than or equal to a given number. */ - def ceil() = Call(BuiltInFunctionNames.CEIL, expr) + def ceil() = Ceil(expr) /** * Creates a substring of the given string between the given indices. @@ -130,9 +130,8 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { - Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = + SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. @@ -140,9 +139,8 @@ trait ImplicitExpressionOperations { * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { - Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = + new SubString(expr, beginIndex) /** * Removes leading and/or trailing characters from the given string. @@ -155,25 +153,13 @@ trait ImplicitExpressionOperations { def trim( removeLeading: Boolean = true, removeTrailing: Boolean = true, - character: Expression = BuiltInFunctionConstants.TRIM_DEFAULT_CHAR) = { + character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = { if (removeLeading && removeTrailing) { - Call( - BuiltInFunctionNames.TRIM, - BuiltInFunctionConstants.TRIM_BOTH, - character, - expr) + Trim(TrimConstants.TRIM_BOTH, character, expr) } else if (removeLeading) { - Call( - BuiltInFunctionNames.TRIM, - BuiltInFunctionConstants.TRIM_LEADING, - character, - expr) + Trim(TrimConstants.TRIM_LEADING, character, expr) } else if (removeTrailing) { - Call( - BuiltInFunctionNames.TRIM, - BuiltInFunctionConstants.TRIM_TRAILING, - character, - expr) + Trim(TrimConstants.TRIM_TRAILING, character, expr) } else { expr } @@ -182,51 +168,39 @@ trait ImplicitExpressionOperations { /** * Returns the length of a String. */ - def charLength() = { - Call(BuiltInFunctionNames.CHAR_LENGTH, expr) - } + def charLength() = CharLength(expr) /** * Returns all of the characters in a String in upper case using the rules of * the default locale. */ - def upperCase() = { - Call(BuiltInFunctionNames.UPPER_CASE, expr) - } + def upperCase() = Upper(expr) /** * Returns all of the characters in a String in lower case using the rules of * the default locale. */ - def lowerCase() = { - Call(BuiltInFunctionNames.LOWER_CASE, expr) - } + def lowerCase() = Lower(expr) /** * Converts the initial letter of each word in a String to uppercase. * Assumes a String containing only [A-Za-z0-9], everything else is treated as whitespace. */ - def initCap() = { - Call(BuiltInFunctionNames.INIT_CAP, expr) - } + def initCap() = InitCap(expr) /** * Returns true, if a String matches the specified LIKE pattern. * * e.g. "Jo_n%" matches all Strings that start with "Jo(arbitrary letter)n" */ - def like(pattern: Expression) = { - Call(BuiltInFunctionNames.LIKE, expr, pattern) - } + def like(pattern: Expression) = Like(expr, pattern) /** * Returns true, if a String matches the specified SQL regex pattern. * * e.g. "A+" matches all Strings that consist of at least one A */ - def similar(pattern: Expression) = { - Call(BuiltInFunctionNames.SIMILAR, expr, pattern) - } + def similar(pattern: Expression) = Similar(expr, pattern) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 39e3105..207500a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} import org.apache.flink.api.java.io.DiscardingOutputFormat @@ -31,7 +32,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention} +import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable} import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink} @@ -72,7 +74,7 @@ abstract class BatchTableEnvironment( val m = internalNamePattern.findFirstIn(name) m match { case Some(_) => - throw new TableException(s"Illegal Table name. " + + throw new ValidationException(s"Illegal Table name. " + s"Please choose a name that does not contain the pattern $internalNamePattern") case None => } @@ -87,18 +89,15 @@ abstract class BatchTableEnvironment( * The table to scan must be registered in the [[TableEnvironment]]'s catalog. * * @param tableName The name of the table to scan. - * @throws TableException if no table is registered under the given name. + * @throws ValidationException if no table is registered under the given name. * @return The scanned table. */ - @throws[TableException] + @throws[ValidationException] def scan(tableName: String): Table = { - if (isRegistered(tableName)) { - relBuilder.scan(tableName) - new Table(relBuilder.build(), this) - } - else { - throw new TableException(s"Table \'$tableName\' was not found in the registry.") + new Table(this, CatalogNode(tableName, getRowType(tableName))) + } else { + throw new ValidationException(s"Table \'$tableName\' was not found in the registry.") } } @@ -133,7 +132,7 @@ abstract class BatchTableEnvironment( // transform to a relational tree val relational = planner.rel(validated) - new Table(relational.rel, this) + new Table(this, LogicalRelNode(relational.rel)) } /** @@ -169,7 +168,7 @@ abstract class BatchTableEnvironment( */ private[flink] def explain(table: Table, extended: Boolean): String = { - val ast = RelOptUtil.toString(table.relNode) + val ast = RelOptUtil.toString(table.getRelNode) val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment @@ -219,15 +218,10 @@ abstract class BatchTableEnvironment( * @tparam T The type of the [[DataSet]]. */ protected def registerDataSetInternal[T]( - name: String, dataSet: DataSet[T], - fields: Array[Expression]): Unit = { + name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = { - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields.toArray) - val dataSetTable = new DataSetTable[T]( - dataSet, - fieldIndexes.toArray, - fieldNames.toArray - ) + val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields) + val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames) registerTableInternal(name, dataSetTable) } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala deleted file mode 100644 index 2d6fae6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionParserException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * Exception for all errors occurring during expression evaluation. - */ -class ExpressionParserException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala index 5a1b3fe..9d0a146 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala @@ -34,7 +34,7 @@ import org.apache.calcite.sql.parser.{SqlParser, SqlParseException} import org.apache.calcite.sql.validate.SqlValidator import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable} -import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig} +import org.apache.calcite.tools.{RelConversionException, ValidationException => CValidationException, Frameworks, FrameworkConfig} import org.apache.calcite.util.Util import scala.collection.JavaConversions._ @@ -96,7 +96,7 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) { } catch { case e: RuntimeException => { - throw new ValidationException(e) + throw new CValidationException(e) } } validatedSqlNode http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index be1c005..8ba3000 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -24,10 +24,12 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention} +import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.api.table.plan.schema. @@ -86,18 +88,17 @@ abstract class StreamTableEnvironment( * The table to ingest must be registered in the [[TableEnvironment]]'s catalog. * * @param tableName The name of the table to ingest. - * @throws TableException if no table is registered under the given name. + * @throws ValidationException if no table is registered under the given name. * @return The ingested table. */ - @throws[TableException] + @throws[ValidationException] def ingest(tableName: String): Table = { if (isRegistered(tableName)) { - relBuilder.scan(tableName) - new Table(relBuilder.build(), this) + new Table(this, CatalogNode(tableName, getRowType(tableName))) } else { - throw new TableException(s"Table \'$tableName\' was not found in the registry.") + throw new ValidationException(s"Table \'$tableName\' was not found in the registry.") } } @@ -132,7 +133,7 @@ abstract class StreamTableEnvironment( // transform to a relational tree val relational = planner.rel(validated) - new Table(relational.rel, this) + new Table(this, LogicalRelNode(relational.rel)) } /** @@ -240,7 +241,7 @@ abstract class StreamTableEnvironment( */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val relNode = table.relNode + val relNode = table.getRelNode // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 6ccde47..8aa9e10 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -21,11 +21,13 @@ package org.apache.flink.api.table import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex -import org.apache.calcite.plan.RelOptPlanner +import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.tools.{Frameworks, FrameworkConfig, RelBuilder} +import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} + import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv} @@ -35,11 +37,11 @@ import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv} import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} +import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.plan.schema.{TransStreamTable, RelTable} import org.apache.flink.api.table.sinks.TableSink -import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable} +import org.apache.flink.api.table.validate.FunctionCatalog import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} @@ -72,10 +74,16 @@ abstract class TableEnvironment(val config: TableConfig) { // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. protected val relBuilder: RelBuilder = RelBuilder.create(frameworkConfig) - // the planner instance used to optimize queries of this TableEnvironment - private val planner: RelOptPlanner = relBuilder + private val cluster: RelOptCluster = relBuilder .values(Array("dummy"), new Integer(1)) - .build().getCluster.getPlanner + .build().getCluster + + // the planner instance used to optimize queries of this TableEnvironment + private val planner: RelOptPlanner = cluster.getPlanner + + private val typeFactory: RelDataTypeFactory = cluster.getTypeFactory + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns // a counter for unique attribute names private val attrNameCntr: AtomicInteger = new AtomicInteger(0) @@ -94,7 +102,7 @@ abstract class TableEnvironment(val config: TableConfig) { // check that table belongs to this table environment if (table.tableEnv != this) { - throw new TableException( + throw new ValidationException( "Only tables that belong to this TableEnvironment can be registered.") } @@ -152,7 +160,7 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param name The name under which the table is registered. * @param table The table to register in the catalog - * @throws TableException if another table is registered under the provided name. + * @throws ValidationException if another table is registered under the provided name. */ @throws[TableException] protected def registerTableInternal(name: String, table: AbstractTable): Unit = { @@ -182,6 +190,10 @@ abstract class TableEnvironment(val config: TableConfig) { tables.getTableNames.contains(name) } + protected def getRowType(name: String): RelDataType = { + tables.getTable(name).getRowType(typeFactory) + } + /** Returns a unique temporary attribute name. */ private[flink] def createUniqueAttributeName(): String = { "TMP_" + attrNameCntr.getAndIncrement() @@ -197,6 +209,10 @@ abstract class TableEnvironment(val config: TableConfig) { planner } + private[flink] def getFunctionCatalog: FunctionCatalog = { + functionCatalog + } + /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */ private[flink] def getFrameworkConfig: FrameworkConfig = { frameworkConfig @@ -253,7 +269,7 @@ abstract class TableEnvironment(val config: TableConfig) { case t: TupleTypeInfo[A] => exprs.zipWithIndex.map { case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Naming(UnresolvedFieldReference(origName), name), _) => + case (Alias(UnresolvedFieldReference(origName), name), _) => val idx = t.getFieldIndex(origName) if (idx < 0) { throw new IllegalArgumentException(s"$origName is not a field of type $t") @@ -265,7 +281,7 @@ abstract class TableEnvironment(val config: TableConfig) { case c: CaseClassTypeInfo[A] => exprs.zipWithIndex.map { case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Naming(UnresolvedFieldReference(origName), name), _) => + case (Alias(UnresolvedFieldReference(origName), name), _) => val idx = c.getFieldIndex(origName) if (idx < 0) { throw new IllegalArgumentException(s"$origName is not a field of type $c") @@ -276,7 +292,7 @@ abstract class TableEnvironment(val config: TableConfig) { } case p: PojoTypeInfo[A] => exprs.map { - case Naming(UnresolvedFieldReference(origName), name) => + case Alias(UnresolvedFieldReference(origName), name) => val idx = p.getFieldIndex(origName) if (idx < 0) { throw new IllegalArgumentException(s"$origName is not a field of type $p") @@ -389,5 +405,4 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } - } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala deleted file mode 100644 index 3e298a4..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala new file mode 100644 index 0000000..a3ab6fd --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +/** + * Exception for all errors occurring during expression parsing. + */ +case class ExpressionParserException(msg: String) extends RuntimeException(msg) + +/** + * General Exception for all errors during table handling. + */ +case class TableException(msg: String) extends RuntimeException(msg) + +/** + * Exception for all errors occurring during validation phase. + */ +case class ValidationException(msg: String) extends RuntimeException(msg) + +/** + * Exception for unwanted method calling on unresolved expression. + */ +case class UnresolvedException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala index 6960a9f..14e464e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationSuccess} + +abstract class Expression extends TreeNode[Expression] { + /** + * Returns the [[TypeInformation]] for evaluating this expression. + * It is sometimes not available until the expression is valid. + */ + def resultType: TypeInformation[_] + + /** + * One pass validation of the expression tree in post order. + */ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** + * Check input data types, inputs number or other properties specified by this expression. + * Return `ValidationSuccess` if it pass the check, + * or `ValidationFailure` with supplement message explaining the error. + * Note: we should only call this method until `childrenValid == true` + */ + def validateInput(): ExprValidationResult = ValidationSuccess /** * Convert Expression to its counterpart in Calcite, i.e. RexNode @@ -32,31 +53,36 @@ abstract class Expression extends TreeNode[Expression] { self: Product => throw new UnsupportedOperationException( s"${this.getClass.getName} cannot be transformed to RexNode" ) + + def checkEquals(other: Expression): Boolean = { + if (this.getClass != other.getClass) { + false + } else { + def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = { + elements1.length == elements2.length && elements1.zip(elements2).forall { + case (e1: Expression, e2: Expression) => e1.checkEquals(e2) + case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2) + case (i1, i2) => i1 == i2 + } + } + val elements1 = this.productIterator.toSeq + val elements2 = other.productIterator.toSeq + checkEquality(elements1, elements2) + } + } } -abstract class BinaryExpression extends Expression { self: Product => +abstract class BinaryExpression extends Expression { def left: Expression def right: Expression def children = Seq(left, right) } -abstract class UnaryExpression extends Expression { self: Product => +abstract class UnaryExpression extends Expression { def child: Expression def children = Seq(child) } -abstract class LeafExpression extends Expression { self: Product => +abstract class LeafExpression extends Expression { val children = Nil } - -case class NopExpression() extends LeafExpression { - override val name = Expression.freshName("nop") -} - -object Expression { - def freshName(prefix: String): String = { - s"$prefix-${freshNameCounter.getAndIncrement}" - } - - val freshNameCounter = new AtomicInteger -} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index ffadca5..db3d187 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -111,8 +111,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral - lazy val fieldReference: PackratParser[Expression] = ident ^^ { - case sym => UnresolvedFieldReference(sym) + lazy val fieldReference: PackratParser[NamedExpression] = ident ^^ { + sym => UnresolvedFieldReference(sym) } lazy val atom: PackratParser[Expression] = @@ -155,7 +155,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixAs: PackratParser[Expression] = composite ~ "." ~ AS ~ "(" ~ fieldReference ~ ")" ^^ { - case e ~ _ ~ _ ~ _ ~ target ~ _ => Naming(e, target.name) + case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.name) } lazy val suffixEval: PackratParser[Expression] = @@ -165,27 +165,23 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixFunctionCall = composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ { - case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*) + case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args) } lazy val suffixTrim = composite ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~ ")" ^^ { case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ => val flag = trimType match { - case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH - case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING - case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING + case "BOTH" => TrimConstants.TRIM_BOTH + case "LEADING" => TrimConstants.TRIM_LEADING + case "TRAILING" => TrimConstants.TRIM_TRAILING } - Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand) + Trim(flag, trimCharacter, operand) } lazy val suffixTrimWithoutArgs = composite <~ ".trim" ~ opt("()") ^^ { case e => - Call( - BuiltInFunctionNames.TRIM, - BuiltInFunctionConstants.TRIM_BOTH, - BuiltInFunctionConstants.TRIM_DEFAULT_CHAR, - e) + Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e) } lazy val suffixed: PackratParser[Expression] = @@ -223,7 +219,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixAs: PackratParser[Expression] = AS ~ "(" ~ expression ~ "," ~ fieldReference ~ ")" ^^ { - case _ ~ _ ~ e ~ _ ~ target ~ _ => Naming(e, target.name) + case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.name) } lazy val prefixEval: PackratParser[Expression] = composite ~ @@ -232,27 +228,23 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { } lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ { - case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*) + case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args) } lazy val prefixTrim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ { case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => val flag = trimType match { - case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH - case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING - case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING + case "BOTH" => TrimConstants.TRIM_BOTH + case "LEADING" => TrimConstants.TRIM_LEADING + case "TRAILING" => TrimConstants.TRIM_TRAILING } - Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand) + Trim(flag, trimCharacter, operand) } lazy val prefixTrimWithoutArgs = "trim(" ~ expression ~ ")" ^^ { case _ ~ operand ~ _ => - Call( - BuiltInFunctionNames.TRIM, - BuiltInFunctionConstants.TRIM_BOTH, - BuiltInFunctionConstants.TRIM_DEFAULT_CHAR, - operand) + Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand) } lazy val prefixed: PackratParser[Expression] = @@ -322,7 +314,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // alias lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ { - case e ~ _ ~ name => Naming(e, name.name) + case e ~ _ ~ name => Alias(e, name.name) } | logic lazy val expression: PackratParser[Expression] = alias http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala new file mode 100644 index 0000000..9cb52d5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import scala.collection.mutable + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.validate._ + +/** + * Expressions that have specification on its inputs. + */ +trait InputTypeSpec extends Expression { + + /** + * Input type specification for each child. + * + * For example, [[Power]] expecting both of the children be of Double Type should use: + * {{{ + * def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil + * }}} + */ + def expectedTypes: Seq[TypeInformation[_]] + + override def validateInput(): ExprValidationResult = { + val typeMismatches = mutable.ArrayBuffer.empty[String] + children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) => + if (e.resultType != tpe) { + typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}" + } + } + if (typeMismatches.isEmpty) { + ValidationSuccess + } else { + ValidationFailure( + s"$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala deleted file mode 100644 index 9d4ca80..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TreeNode.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -/** - * Generic base class for trees that can be transformed and traversed. - */ -abstract class TreeNode[A <: TreeNode[A]] { self: A with Product => - - /** - * List of child nodes that should be considered when doing transformations. Other values - * in the Product will not be transformed, only handed through. - */ - def children: Seq[A] - - /** - * Tests for equality by first testing for reference equality. - */ - def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other - - def transformPre(rule: PartialFunction[A, A]): A = { - val afterTransform = rule.applyOrElse(this, identity[A]) - - if (afterTransform fastEquals this) { - this.transformChildrenPre(rule) - } else { - afterTransform.transformChildrenPre(rule) - } - } - - def transformChildrenPre(rule: PartialFunction[A, A]): A = { - var changed = false - val newArgs = productIterator map { - case child: A if children.contains(child) => - val newChild = child.transformPre(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - - if (changed) makeCopy(newArgs) else this - } - - def transformPost(rule: PartialFunction[A, A]): A = { - val afterChildren = transformChildrenPost(rule) - if (afterChildren fastEquals this) { - rule.applyOrElse(this, identity[A]) - } else { - rule.applyOrElse(afterChildren, identity[A]) - } - } - - def transformChildrenPost(rule: PartialFunction[A, A]): A = { - var changed = false - val newArgs = productIterator map { - case child: A if children.contains(child) => - val newChild = child.transformPost(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - // toArray forces evaluation, toSeq does not seem to work here - - if (changed) makeCopy(newArgs) else this - } - - def exists(predicate: A => Boolean): Boolean = { - var exists = false - this.transformPre { - case e: A => if (predicate(e)) { - exists = true - } - e - } - exists - } - - /** - * Creates a new copy of this expression with new children. This is used during transformation - * if children change. This must be overridden by tree nodes that don't have the Constructor - * arguments in the same order as the `children`. - */ - def makeCopy(newArgs: Seq[AnyRef]): this.type = { - val defaultCtor = - this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head - try { - defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] - } catch { - case iae: IllegalArgumentException => - println("IAE " + this) - throw new RuntimeException("Should never happen.") - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala index 8cd9dc3..24ce85f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala @@ -22,7 +22,10 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.calcite.tools.RelBuilder.AggCall -abstract sealed class Aggregation extends UnaryExpression { self: Product => +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.typeutils.TypeCheckUtils + +abstract sealed class Aggregation extends UnaryExpression { override def toString = s"Aggregate($child)" @@ -36,41 +39,59 @@ abstract sealed class Aggregation extends UnaryExpression { self: Product => } case class Sum(child: Expression) extends Aggregation { - override def toString = s"($child).sum" + override def toString = s"sum($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode) } + + override def resultType = child.resultType + + override def validateInput = TypeCheckUtils.assertNumericExpr(child.resultType, "sum") } case class Min(child: Expression) extends Aggregation { - override def toString = s"($child).min" + override def toString = s"min($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode) } + + override def resultType = child.resultType + + override def validateInput = TypeCheckUtils.assertOrderableExpr(child.resultType, "min") } case class Max(child: Expression) extends Aggregation { - override def toString = s"($child).max" + override def toString = s"max($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode) } + + override def resultType = child.resultType + + override def validateInput = TypeCheckUtils.assertOrderableExpr(child.resultType, "max") } case class Count(child: Expression) extends Aggregation { - override def toString = s"($child).count" + override def toString = s"count($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode) } + + override def resultType = BasicTypeInfo.LONG_TYPE_INFO } case class Avg(child: Expression) extends Aggregation { - override def toString = s"($child).avg" + override def toString = s"avg($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode) } + + override def resultType = child.resultType + + override def validateInput = TypeCheckUtils.assertNumericExpr(child.resultType, "avg") } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala index ca67697..0ce4685 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala @@ -25,15 +25,34 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, TypeInformation} +import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeCoercion, TypeConverter} +import org.apache.flink.api.table.validate._ -abstract class BinaryArithmetic extends BinaryExpression { self: Product => +abstract class BinaryArithmetic extends BinaryExpression { def sqlOperator: SqlOperator override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.call(sqlOperator, children.map(_.toRexNode)) } + + override def resultType: TypeInformation[_] = + TypeCoercion.widerTypeOf(left.resultType, right.resultType) match { + case Some(t) => t + case None => + throw new RuntimeException("This should never happen.") + } + + // TODO: tighten this rule once we implemented type coercion rules during validation + override def validateInput(): ExprValidationResult = { + if (!left.resultType.isInstanceOf[NumericTypeInfo[_]] || + !right.resultType.isInstanceOf[NumericTypeInfo[_]]) { + ValidationFailure(s"$this requires both operands Numeric, get" + + s"${left.resultType} and ${right.resultType}") + } else { + ValidationSuccess + } + } } case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { @@ -56,6 +75,20 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { relBuilder.call(SqlStdOperatorTable.PLUS, l, r) } } + + // TODO: tighten this rule once we implemented type coercion rules during validation + override def validateInput(): ExprValidationResult = { + if (left.resultType == BasicTypeInfo.STRING_TYPE_INFO || + right.resultType == BasicTypeInfo.STRING_TYPE_INFO) { + ValidationSuccess + } else if (!left.resultType.isInstanceOf[NumericTypeInfo[_]] || + !right.resultType.isInstanceOf[NumericTypeInfo[_]]) { + ValidationFailure(s"$this requires Numeric or String input," + + s" get ${left.resultType} and ${right.resultType}") + } else { + ValidationSuccess + } + } } case class UnaryMinus(child: Expression) extends UnaryExpression { @@ -64,6 +97,11 @@ case class UnaryMinus(child: Expression) extends UnaryExpression { override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode) } + + override def resultType = child.resultType + + override def validateInput(): ExprValidationResult = + TypeCheckUtils.assertNumericExpr(child.resultType, "unary minus") } case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala index e36a784..bf2e6ba 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala @@ -18,85 +18,28 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} + /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { override def children: Seq[Expression] = args override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.call( - BuiltInFunctionNames.toSqlOperator(functionName), - args.map(_.toRexNode): _*) + throw new UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode") } override def toString = s"\\$functionName(${args.mkString(", ")})" - override def makeCopy(newArgs: Seq[AnyRef]): this.type = { - val copy = Call( - newArgs.head.asInstanceOf[String], - newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*) - - copy.asInstanceOf[this.type] - } -} - -/** - * Enumeration of common function names. - */ -object BuiltInFunctionNames { - val SUBSTRING = "SUBSTRING" - val TRIM = "TRIM" - val CHAR_LENGTH = "CHARLENGTH" - val UPPER_CASE = "UPPERCASE" - val LOWER_CASE = "LOWERCASE" - val INIT_CAP = "INITCAP" - val LIKE = "LIKE" - val SIMILAR = "SIMILAR" - val MOD = "MOD" - val EXP = "EXP" - val LOG10 = "LOG10" - val POWER = "POWER" - val LN = "LN" - val ABS = "ABS" - val FLOOR = "FLOOR" - val CEIL = "CEIL" - - def toSqlOperator(name: String): SqlOperator = { - name match { - case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING - case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM - case BuiltInFunctionNames.CHAR_LENGTH => SqlStdOperatorTable.CHAR_LENGTH - case BuiltInFunctionNames.UPPER_CASE => SqlStdOperatorTable.UPPER - case BuiltInFunctionNames.LOWER_CASE => SqlStdOperatorTable.LOWER - case BuiltInFunctionNames.INIT_CAP => SqlStdOperatorTable.INITCAP - case BuiltInFunctionNames.LIKE => SqlStdOperatorTable.LIKE - case BuiltInFunctionNames.SIMILAR => SqlStdOperatorTable.SIMILAR_TO - case BuiltInFunctionNames.EXP => SqlStdOperatorTable.EXP - case BuiltInFunctionNames.LOG10 => SqlStdOperatorTable.LOG10 - case BuiltInFunctionNames.POWER => SqlStdOperatorTable.POWER - case BuiltInFunctionNames.LN => SqlStdOperatorTable.LN - case BuiltInFunctionNames.ABS => SqlStdOperatorTable.ABS - case BuiltInFunctionNames.MOD => SqlStdOperatorTable.MOD - case BuiltInFunctionNames.FLOOR => SqlStdOperatorTable.FLOOR - case BuiltInFunctionNames.CEIL => SqlStdOperatorTable.CEIL - case _ => ??? - } - } -} + override def resultType = + throw new UnresolvedException(s"calling resultType on UnresolvedFunction $functionName") -/** - * Enumeration of common function flags. - */ -object BuiltInFunctionConstants { - val TRIM_BOTH = Literal(0) - val TRIM_LEADING = Literal(1) - val TRIM_TRAILING = Literal(2) - val TRIM_DEFAULT_CHAR = Literal(" ") + override def validateInput(): ExprValidationResult = + ValidationFailure(s"Unresolved function call: $functionName") } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala index fdad1f6..3b8b0e7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala @@ -21,18 +21,27 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.{TypeCoercion, TypeConverter} +import org.apache.flink.api.table.validate._ -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { +case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression { - override def toString = s"$child.cast($tpe)" + override def toString = s"$child.cast($resultType)" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe)) + relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(resultType)) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] - copy(child, tpe).asInstanceOf[this.type] + copy(child, resultType).asInstanceOf[this.type] + } + + override def validateInput(): ExprValidationResult = { + if (TypeCoercion.canCast(child.resultType, resultType)) { + ValidationSuccess + } else { + ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType") + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index 124393c..63caeaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -24,24 +24,59 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -abstract class BinaryComparison extends BinaryExpression { self: Product => +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.NumericTypeInfo +import org.apache.flink.api.table.validate._ + +abstract class BinaryComparison extends BinaryExpression { def sqlOperator: SqlOperator override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.call(sqlOperator, children.map(_.toRexNode)) } + + override def resultType = BOOLEAN_TYPE_INFO + + // TODO: tighten this rule once we implemented type coercion rules during validation + override def validateInput(): ExprValidationResult = (left.resultType, right.resultType) match { + case (STRING_TYPE_INFO, STRING_TYPE_INFO) => ValidationSuccess + case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess + case (lType, rType) => + ValidationFailure( + s"Comparison is only supported for Strings and numeric types, get $lType and $rType") + } } case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { override def toString = s"$left === $right" val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS + + override def validateInput(): ExprValidationResult = (left.resultType, right.resultType) match { + case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess + case (lType, rType) => + if (lType != rType) { + ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType") + } else { + ValidationSuccess + } + } } case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { override def toString = s"$left !== $right" val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS + + override def validateInput(): ExprValidationResult = (left.resultType, right.resultType) match { + case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => ValidationSuccess + case (lType, rType) => + if (lType != rType) { + ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType") + } else { + ValidationSuccess + } + } } case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { @@ -74,6 +109,8 @@ case class IsNull(child: Expression) extends UnaryExpression { override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.isNull(child.toRexNode) } + + override def resultType = BOOLEAN_TYPE_INFO } case class IsNotNull(child: Expression) extends UnaryExpression { @@ -82,4 +119,6 @@ case class IsNotNull(child: Expression) extends UnaryExpression { override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.isNotNull(child.toRexNode) } + + override def resultType = BOOLEAN_TYPE_INFO } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala index 82f7653..24283d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala @@ -20,27 +20,88 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} + +trait NamedExpression extends Expression { + def name: String + def toAttribute: Attribute +} + +abstract class Attribute extends LeafExpression with NamedExpression { + override def toAttribute: Attribute = this + + def withName(newName: String): Attribute +} + +case class UnresolvedFieldReference(name: String) extends Attribute { + override def toString = "\"" + name + override def withName(newName: String): Attribute = UnresolvedFieldReference(newName) + + override def resultType: TypeInformation[_] = + throw new UnresolvedException(s"calling resultType on ${this.getClass}") + + override def validateInput(): ExprValidationResult = + ValidationFailure(s"Unresolved reference $name") +} + +case class ResolvedFieldReference( + name: String, + resultType: TypeInformation[_]) extends Attribute { + + override def toString = s"'$name" + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.field(name) } -} -case class ResolvedFieldReference(override val name: String) extends LeafExpression { - override def toString = s"'$name" + override def withName(newName: String): Attribute = { + if (newName == name) { + this + } else { + ResolvedFieldReference(newName, resultType) + } + } } -case class Naming(child: Expression, override val name: String) extends UnaryExpression { +case class Alias(child: Expression, name: String) + extends UnaryExpression with NamedExpression { + override def toString = s"$child as '$name" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.alias(child.toRexNode, name) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def resultType: TypeInformation[_] = child.resultType + + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] copy(child, name).asInstanceOf[this.type] } + + override def toAttribute: Attribute = { + if (valid) { + ResolvedFieldReference(name, child.resultType) + } else { + UnresolvedFieldReference(name) + } + } +} + +case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression { + + override def name: String = + throw new UnresolvedException("Invalid call to name on UnresolvedAlias") + + override def toAttribute: Attribute = + throw new UnresolvedException("Invalid call to toAttribute on UnresolvedAlias") + + override def resultType: TypeInformation[_] = + throw new UnresolvedException("Invalid call to resultType on UnresolvedAlias") + + override lazy val valid = false } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index 1fbe5a3..9caec26 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -22,7 +22,6 @@ import java.util.Date import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.scala.table.ImplicitExpressionOperations import org.apache.flink.api.table.typeutils.TypeConverter object Literal { @@ -39,11 +38,7 @@ object Literal { } } -case class Literal(value: Any, tpe: TypeInformation[_]) - extends LeafExpression with ImplicitExpressionOperations { - def expr = this - def typeInfo = tpe - +case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression { override def toString = s"$value" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { @@ -51,13 +46,10 @@ case class Literal(value: Any, tpe: TypeInformation[_]) } } -case class Null(tpe: TypeInformation[_]) extends LeafExpression { - def expr = this - def typeInfo = tpe - +case class Null(resultType: TypeInformation[_]) extends LeafExpression { override def toString = s"null" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe)) + relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(resultType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala index 37a6597..90d3dbc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala @@ -21,25 +21,47 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -abstract class BinaryPredicate extends BinaryExpression { self: Product => } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.validate._ + +abstract class BinaryPredicate extends BinaryExpression { + override def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO && + right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"$this only accepts children of Boolean Type, " + + s"get ${left.resultType} and ${right.resultType}") + } + } +} case class Not(child: Expression) extends UnaryExpression { - override val name = Expression.freshName("not-" + child.name) - override def toString = s"!($child)" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.not(child.toRexNode) } + + override def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"Not only accepts child of Boolean Type, " + + s"get ${child.resultType}") + } + } } case class And(left: Expression, right: Expression) extends BinaryPredicate { override def toString = s"$left && $right" - override val name = Expression.freshName(left.name + "-and-" + right.name) - override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.and(left.toRexNode, right.toRexNode) } @@ -49,8 +71,6 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate { override def toString = s"$left || $right" - override val name = Expression.freshName(left.name + "-or-" + right.name) - override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.or(left.toRexNode, right.toRexNode) } @@ -63,10 +83,9 @@ case class Eval( extends Expression { def children = Seq(condition, ifTrue, ifFalse) - override def toString = s"($condition)? $ifTrue : $ifFalse" + override def resultType = ifTrue.resultType - override val name = Expression.freshName("if-" + condition.name + - "-then-" + ifTrue.name + "-else-" + ifFalse.name) + override def toString = s"($condition)? $ifTrue : $ifFalse" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { val c = condition.toRexNode @@ -74,4 +93,15 @@ case class Eval( val f = ifFalse.toRexNode relBuilder.call(SqlStdOperatorTable.CASE, c, t, f) } + + override def validateInput(): ExprValidationResult = { + if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO && + ifTrue.resultType == ifFalse.resultType) { + ValidationSuccess + } else { + ValidationFailure( + s"Eval should have boolean condition and same type of ifTrue and ifFalse, get " + + s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})") + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala new file mode 100644 index 0000000..cf734f0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils +import org.apache.flink.api.table.validate._ + +case class Abs(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = child.resultType + + override def validateInput(): ExprValidationResult = + TypeCheckUtils.assertNumericExpr(child.resultType, "Abs") + + override def toString(): String = s"abs($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode) + } +} + +case class Ceil(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = LONG_TYPE_INFO + + override def validateInput(): ExprValidationResult = + TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil") + + override def toString(): String = s"ceil($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode) + } +} + +case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec { + override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil + + override def toString(): String = s"exp($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.EXP, child.toRexNode) + } +} + + +case class Floor(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = LONG_TYPE_INFO + + override def validateInput(): ExprValidationResult = + TypeCheckUtils.assertNumericExpr(child.resultType, "Floor") + + override def toString(): String = s"floor($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.FLOOR, child.toRexNode) + } +} + +case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec { + override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil + + override def toString(): String = s"log10($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.LOG10, child.toRexNode) + } +} + +case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec { + override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil + + override def toString(): String = s"ln($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.LN, child.toRexNode) + } +} + +case class Power(left: Expression, right: Expression) extends BinaryExpression with InputTypeSpec { + override def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil + + override def toString(): String = s"pow($left, $right)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.POWER, left.toRexNode, right.toRexNode) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala index 75fa078..887cf60 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala @@ -16,28 +16,39 @@ * limitations under the License. */ package org.apache.flink.api.table.expressions + import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Ordering extends UnaryExpression { self: Product => +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.validate._ + +abstract class Ordering extends UnaryExpression { + override def validateInput(): ExprValidationResult = { + if (!child.isInstanceOf[NamedExpression]) { + ValidationFailure(s"Sort should only based on field reference") + } else { + ValidationSuccess + } + } } case class Asc(child: Expression) extends Ordering { override def toString: String = s"($child).asc" - override def name: String = child.name + "-asc" - override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { child.toRexNode } + + override def resultType: TypeInformation[_] = child.resultType } case class Desc(child: Expression) extends Ordering { override def toString: String = s"($child).desc" - override def name: String = child.name + "-desc" - override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.desc(child.toRexNode) } + + override def resultType: TypeInformation[_] = child.resultType } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala new file mode 100644 index 0000000..11825f7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import scala.collection.JavaConversions._ + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.validate._ + +/** + * Returns the length of this `str`. + */ +case class CharLength(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = INT_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (child.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"CharLength only accepts String input, get ${child.resultType}") + } + } + + override def toString(): String = s"($child).charLength()" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.CHAR_LENGTH, child.toRexNode) + } +} + +/** + * Returns str with the first letter of each word in uppercase. + * All other letters are in lowercase. Words are delimited by white space. + */ +case class InitCap(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (child.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"InitCap only accepts String input, get ${child.resultType}") + } + } + + override def toString(): String = s"($child).initCap()" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.INITCAP, child.toRexNode) + } +} + +/** + * Returns true if `str` matches `pattern`. + */ +case class Like(str: Expression, pattern: Expression) extends BinaryExpression { + def left: Expression = str + def right: Expression = pattern + + override def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"Like only accepts (String, String) input, " + + s"get (${str.resultType}, ${pattern.resultType})") + } + } + + override def toString(): String = s"($str).like($pattern)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.LIKE, children.map(_.toRexNode)) + } +} + +/** + * Returns str with all characters changed to lowercase. + */ +case class Lower(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (child.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"Lower only accepts String input, get ${child.resultType}") + } + } + + override def toString(): String = s"($child).toLowerCase()" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode) + } +} + +/** + * Returns true if `str` is similar to `pattern`. + */ +case class Similar(str: Expression, pattern: Expression) extends BinaryExpression { + def left: Expression = str + def right: Expression = pattern + + override def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"Similar only accepts (String, String) input, " + + s"get (${str.resultType}, ${pattern.resultType})") + } + } + + override def toString(): String = s"($str).similarTo($pattern)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.SIMILAR_TO, children.map(_.toRexNode)) + } +} + +/** + * Returns subString of `str` from `begin`(inclusive) to `end`(not inclusive). + */ +case class SubString( + str: Expression, + begin: Expression, + end: Expression) extends Expression with InputTypeSpec { + + def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str)) + + override def children: Seq[Expression] = str :: begin :: end :: Nil + + override def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = + Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO) + + override def toString(): String = s"$str.subString($begin, $end)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode)) + } +} + +/** + * Trim `trimString` from `str` according to `trimFlag`: + * 0 for TRIM_BOTH, 1 for TRIM_LEADING and 2 for TRIM_TRAILING. + */ +case class Trim( + trimFlag: Expression, + trimString: Expression, + str: Expression) extends Expression with InputTypeSpec { + + override def children: Seq[Expression] = trimFlag :: trimString :: str :: Nil + + override def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override def expectedTypes: Seq[TypeInformation[_]] = + Seq(INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO) + + override def toString(): String = s"trim($trimFlag, $trimString, $str)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode)) + } +} + +/** + * Enumeration of trim flags. + */ +object TrimConstants { + val TRIM_BOTH = Literal(0) + val TRIM_LEADING = Literal(1) + val TRIM_TRAILING = Literal(2) + val TRIM_DEFAULT_CHAR = Literal(" ") +} + +/** + * Returns str with all characters changed to uppercase. + */ +case class Upper(child: Expression) extends UnaryExpression { + override def resultType: TypeInformation[_] = STRING_TYPE_INFO + + override def validateInput(): ExprValidationResult = { + if (child.resultType == STRING_TYPE_INFO) { + ValidationSuccess + } else { + ValidationFailure(s"Upper only accepts String input, get ${child.resultType}") + } + } + + override def toString(): String = s"($child).toUpperCase()" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode) + } +}