http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index f946ed9..095cf04 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -18,57 +18,49 @@ package org.apache.flink.api.table.plan -import org.apache.calcite.tools.RelBuilder.AggCall import org.apache.flink.api.table.TableEnvironment - import org.apache.flink.api.table.expressions._ object RexNodeTranslator { /** - * Extracts all aggregation expressions (zero, one, or more) from an expression, translates - * these aggregation expressions into Calcite AggCalls, and replaces the original aggregation - * expressions by field accesses expressions. + * Extracts all aggregation expressions (zero, one, or more) from an expression, + * and replaces the original aggregation expressions by field accesses expressions. */ - def extractAggCalls( + def extractAggregations( exp: Expression, - tableEnv: TableEnvironment): Pair[Expression, List[AggCall]] = { - - val relBuilder = tableEnv.getRelBuilder + tableEnv: TableEnvironment): Pair[Expression, List[NamedExpression]] = { exp match { case agg: Aggregation => val name = tableEnv.createUniqueAttributeName() - val aggCall = agg.toAggCall(name)(relBuilder) + val aggCall = Alias(agg, name) val fieldExp = new UnresolvedFieldReference(name) (fieldExp, List(aggCall)) - case n@Naming(agg: Aggregation, name) => - val aggCall = agg.toAggCall(name)(relBuilder) + case n @ Alias(agg: Aggregation, name) => val fieldExp = new UnresolvedFieldReference(name) - (fieldExp, List(aggCall)) + (fieldExp, List(n)) case l: LeafExpression => (l, Nil) case u: UnaryExpression => - val c = extractAggCalls(u.child, tableEnv) - (u.makeCopy(List(c._1)), c._2) + val c = extractAggregations(u.child, tableEnv) + (u.makeCopy(Array(c._1)), c._2) case b: BinaryExpression => - val l = extractAggCalls(b.left, tableEnv) - val r = extractAggCalls(b.right, tableEnv) - (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2) - case e: Eval => - val c = extractAggCalls(e.condition, tableEnv) - val t = extractAggCalls(e.ifTrue, tableEnv) - val f = extractAggCalls(e.ifFalse, tableEnv) - (e.makeCopy(List(c._1, t._1, f._1)), c._2 ::: t._2 ::: f._2) + val l = extractAggregations(b.left, tableEnv) + val r = extractAggregations(b.right, tableEnv) + (b.makeCopy(Array(l._1, r._1)), l._2 ::: r._2) // Scalar functions - case c@Call(name, args@_*) => - val newArgs = args.map(extractAggCalls(_, tableEnv)).toList - (c.makeCopy(name :: newArgs.map(_._1)), newArgs.flatMap(_._2)) + case c @ Call(name, args) => + val newArgs = args.map(extractAggregations(_, tableEnv)) + (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray), newArgs.flatMap(_._2).toList) - case e@AnyRef => - throw new IllegalArgumentException( - s"Expression $e of type ${e.getClass} not supported yet") + case e: Expression => + val newArgs = e.productIterator.map { + case arg: Expression => + extractAggregations(arg, tableEnv) + } + (e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList) } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala new file mode 100644 index 0000000..dae02bd --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.{TableEnvironment, ValidationException} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.typeutils.TypeCoercion +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * + * Expressions' resolution and transformation ([[resolveExpressions]]): + * + * - translate [[UnresolvedFieldReference]] into [[ResolvedFieldReference]] + * using child operator's output + * - translate [[Call]](UnresolvedFunction) into solid Expression + * - generate alias names for query output + * - .... + * + * LogicalNode validation ([[validate]]): + * + * - check no [[UnresolvedFieldReference]] exists any more + * - check if all expressions have children of needed type + * - check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { + // resolve references and function calls + val exprResolved = expressionPostOrderTransform { + case u @ UnresolvedFieldReference(name) => + resolveReference(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => + tableEnv.getFunctionCatalog.lookupFunction(name, children) + } + + exprResolved.expressionPostOrderTransform { + case ips: InputTypeSpec if ips.childrenValid => + var changed: Boolean = false + val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, child) => + val childType = child.resultType + if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) { + changed = true + Cast(child, tpe) + } else { + child + } + }.toArray[AnyRef] + if (changed) ips.makeCopy(newChildren) else ips + } + } + + final def toRelNode(relBuilder: RelBuilder): RelNode = construct(relBuilder).build() + + protected[logical] def construct(relBuilder: RelBuilder): RelBuilder + + def validate(tableEnv: TableEnvironment): LogicalNode = { + val resolvedNode = resolveExpressions(tableEnv) + resolvedNode.expressionPostOrderTransform { + case a: Attribute if !a.valid => + val from = children.flatMap(_.output).map(_.name).mkString(", ") + failValidation(s"cannot resolve [${a.name}] given input [$from]") + + case e: Expression if e.validateInput().isFailure => + failValidation(s"Expression $e failed on input check: " + + s"${e.validateInput().asInstanceOf[ValidationFailure].message}") + } + } + + /** + * Resolves the given strings to a [[NamedExpression]] using the input from all child + * nodes of this LogicalPlan. + */ + def resolveReference(name: String): Option[NamedExpression] = { + val childrenOutput = children.flatMap(_.output) + val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(name)) + if (candidates.length > 1) { + failValidation(s"Reference $name is ambiguous") + } else if (candidates.length == 0) { + None + } else { + Some(candidates.head.withName(name)) + } + } + + /** + * Runs [[postOrderTransform]] with `rule` on all expressions present in this logical node. + * + * @param rule the rule to be applied to every expression in this logical node. + */ + def expressionPostOrderTransform(rule: PartialFunction[Expression, Expression]): LogicalNode = { + var changed = false + + def expressionPostOrderTransform(e: Expression): Expression = { + val newExpr = e.postOrderTransform(rule) + if (newExpr.fastEquals(e)) { + e + } else { + changed = true + newExpr + } + } + + val newArgs = productIterator.map { + case e: Expression => expressionPostOrderTransform(e) + case Some(e: Expression) => Some(expressionPostOrderTransform(e)) + case seq: Traversable[_] => seq.map { + case e: Expression => expressionPostOrderTransform(e) + case other => other + } + case other: AnyRef => other + }.toArray + + if (changed) makeCopy(newArgs) else this + } + + protected def failValidation(msg: String): Nothing = { + throw new ValidationException(msg) + } +} + +abstract class LeafNode extends LogicalNode { + override def children: Seq[LogicalNode] = Nil +} + +abstract class UnaryNode extends LogicalNode { + def child: LogicalNode + + override def children: Seq[LogicalNode] = child :: Nil +} + +abstract class BinaryNode extends LogicalNode { + def left: LogicalNode + def right: LogicalNode + + override def children: Seq[LogicalNode] = left :: right :: Nil +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala new file mode 100644 index 0000000..d347651 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table._ +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { + val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] + val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => + e match { + case u @ UnresolvedAlias(child) => child match { + case ne: NamedExpression => ne + case e if !e.valid => u + case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp") + case other => Alias(other, s"_c$i") + } + case _ => throw new IllegalArgumentException + } + } + Project(newProjectList, child) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + val resolvedProject = super.validate(tableEnv).asInstanceOf[Project] + + def checkUniqueNames(exprs: Seq[Expression]): Unit = { + val names: mutable.Set[String] = mutable.Set() + exprs.foreach { + case n: Alias => + // explicit name + if (names.contains(n.name)) { + throw new ValidationException(s"Duplicate field name $n.name.") + } else { + names.add(n.name) + } + case r: ResolvedFieldReference => + // simple field forwarding + if (names.contains(r.name)) { + throw new ValidationException(s"Duplicate field name $r.name.") + } else { + names.add(r.name) + } + case _ => // Do nothing + } + } + checkUniqueNames(resolvedProject.projectList) + resolvedProject + } + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + val allAlias = projectList.forall(_.isInstanceOf[Alias]) + child.construct(relBuilder) + if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( + LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) + } else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) + } + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = + throw new UnresolvedException("Invalid call to output on AliasNode") + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = + throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { + if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") + } else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("Alias only accept name expressions as arguments") + } else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( + names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) + } + } +} + +case class Distinct(child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + child.construct(relBuilder) + relBuilder.distinct() + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Distinct on stream tables is currently not supported.") + } + this + } +} + +case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + child.construct(relBuilder) + relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Distinct on stream tables is currently not supported.") + } + super.validate(tableEnv) + } +} + +case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + child.construct(relBuilder) + relBuilder.filter(condition.toRexNode(relBuilder)) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter] + if (resolvedFilter.condition.resultType != BOOLEAN_TYPE_INFO) { + failValidation(s"filter expression ${resolvedFilter.condition} of" + + s" ${resolvedFilter.condition.resultType} is not a boolean") + } + resolvedFilter + } +} + +case class Aggregate( + groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[NamedExpression], + child: LogicalNode) extends UnaryNode { + + override def output: Seq[Attribute] = { + (groupingExpressions ++ aggregateExpressions) map { agg => + agg match { + case ne: NamedExpression => ne.toAttribute + case e => Alias(e, e.toString).toAttribute + } + } + } + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + child.construct(relBuilder) + relBuilder.aggregate( + relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), + aggregateExpressions.map { e => + e match { + case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder) + case _ => throw new RuntimeException("This should never happen.") + } + }.asJava) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Aggregate on stream tables is currently not supported.") + } + + val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate] + val groupingExprs = resolvedAggregate.groupingExpressions + val aggregateExprs = resolvedAggregate.aggregateExpressions + aggregateExprs.foreach(validateAggregateExpression) + groupingExprs.foreach(validateGroupingExpression) + + def validateAggregateExpression(expr: Expression): Unit = expr match { + // check no nested aggregation exists. + case aggExpr: Aggregation => + aggExpr.children.foreach { child => + child.preOrderVisit { + case agg: Aggregation => + failValidation( + "It's not allowed to use an aggregate function as " + + "input of another aggregate function") + case _ => // OK + } + } + case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) => + failValidation( + s"expression '$a' is invalid because it is neither" + + " present in group by nor an aggregate function") + case e if groupingExprs.exists(_.checkEquals(e)) => // OK + case e => e.children.foreach(validateAggregateExpression) + } + + def validateGroupingExpression(expr: Expression): Unit = { + if (!expr.resultType.isKeyType) { + failValidation( + s"expression $expr cannot be used as a grouping expression " + + "because it's not a valid key type") + } + } + resolvedAggregate + } +} + +case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode { + override def output: Seq[Attribute] = left.output + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + left.construct(relBuilder) + right.construct(relBuilder) + relBuilder.union(true) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union] + if (left.output.length != right.output.length) { + failValidation(s"Union two table of different column sizes:" + + s" ${left.output.size} and ${right.output.size}") + } + val sameSchema = left.output.zip(right.output).forall { case (l, r) => + l.resultType == r.resultType && l.name == r.name } + if (!sameSchema) { + failValidation(s"Union two table of different schema:" + + s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" + + s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]") + } + resolvedUnion + } +} + +case class Join( + left: LogicalNode, + right: LogicalNode, + joinType: JoinType, + condition: Option[Expression]) extends BinaryNode { + + override def output: Seq[Attribute] = { + joinType match { + case JoinType.INNER => left.output ++ right.output + case j => throw new ValidationException(s"Unsupported JoinType: $j") + } + } + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + joinType match { + case JoinType.INNER => + left.construct(relBuilder) + right.construct(relBuilder) + relBuilder.join(JoinRelType.INNER, + condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true))) + case _ => + throw new ValidationException(s"Unsupported JoinType: $joinType") + } + } + + private def ambiguousName: Set[String] = + left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet) + + override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Join on stream tables is currently not supported.") + } + + val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] + if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) { + failValidation(s"filter expression ${resolvedJoin.condition} is not a boolean") + } else if (!ambiguousName.isEmpty) { + failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}") + } + resolvedJoin + } +} + +case class CatalogNode( + tableName: String, + rowType: RelDataType) extends LeafNode { + + val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field => + ResolvedFieldReference( + field.getName, TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName)) + } + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + relBuilder.scan(tableName) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = this +} + +/** + * Wrapper for valid logical plans generated from SQL String. + */ +case class LogicalRelNode( + relNode: RelNode) extends LeafNode { + + val output: Seq[Attribute] = relNode.getRowType.getFieldList.asScala.map { field => + ResolvedFieldReference( + field.getName, TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName)) + } + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { + relBuilder.push(relNode) + } + + override def validate(tableEnv: TableEnvironment): LogicalNode = this +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 5356a9d..4f111c9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -17,28 +17,18 @@ */ package org.apache.flink.api.table +import scala.collection.JavaConverters._ + import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataTypeField -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode} -import org.apache.calcite.sql.SqlKind -import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} -import org.apache.calcite.util.NlsString + import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink import org.apache.flink.api.table.typeutils.TypeConverter -import scala.collection.mutable -import scala.collection.JavaConverters._ - -case class BaseTable( - private[flink] val relNode: RelNode, - private[flink] val tableEnv: TableEnvironment) - /** * A Table is the core component of the Table API. * Similar to how the batch and streaming APIs have DataSet and DataStream, @@ -66,18 +56,16 @@ case class BaseTable( * in a Scala DSL or as an expression String. Please refer to the documentation for the expression * syntax. * - * @param relNode The root node of the relational Calcite [[RelNode]] tree. * @param tableEnv The [[TableEnvironment]] to which the table is bound. + * @param logicalPlan */ class Table( - private[flink] override val relNode: RelNode, - private[flink] override val tableEnv: TableEnvironment) - extends BaseTable(relNode, tableEnv) -{ + private[flink] val tableEnv: TableEnvironment, + private[flink] val logicalPlan: LogicalNode) { def relBuilder = tableEnv.getRelBuilder - def getRelNode: RelNode = relNode + def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder) /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions @@ -90,44 +78,17 @@ class Table( * }}} */ def select(fields: Expression*): Table = { - - checkUniqueNames(fields) - - relBuilder.push(relNode) - - // separate aggregations and selection expressions - val extractedAggCalls: List[(Expression, List[AggCall])] = fields - .map(extractAggCalls(_, tableEnv)).toList - - // get aggregation calls - val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2) - - // apply aggregations - if (aggCalls.nonEmpty) { - // aggregation on stream table is not currently supported - tableEnv match { - case _: StreamTableEnvironment => - throw new TableException("Aggregation on stream tables is currently not supported.") - case _ => - val emptyKey: GroupKey = relBuilder.groupKey() - relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava) - } - } - - // get selection expressions - val exprs: List[RexNode] = extractedAggCalls.map(_._1.toRexNode(relBuilder)) - - relBuilder.project(exprs.toIterable.asJava) - val projected = relBuilder.build() - - if(relNode == projected) { - // Calcite's RelBuilder does not translate identity projects even if they rename fields. - // Add a projection ourselves (will be automatically removed by translation rules). - new Table(createRenamingProject(exprs), tableEnv) + val projectionOnAggregates = fields.map(extractAggregations(_, tableEnv)) + val aggregations = projectionOnAggregates.flatMap(_._2) + if (aggregations.nonEmpty) { + new Table(tableEnv, + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv)) } else { - new Table(projected, tableEnv) + new Table(tableEnv, + Project( + projectionOnAggregates.map(e => UnresolvedAlias(e._1)), logicalPlan).validate(tableEnv)) } - } /** @@ -156,30 +117,7 @@ class Table( * }}} */ def as(fields: Expression*): Table = { - - val curNames = relNode.getRowType.getFieldNames.asScala - - // validate that AS has only field references - if (! fields.forall( _.isInstanceOf[UnresolvedFieldReference] )) { - throw new IllegalArgumentException("All expressions must be field references.") - } - // validate that we have not more field references than fields - if ( fields.length > curNames.size) { - throw new IllegalArgumentException("More field references than fields.") - } - - val curFields = curNames.map(new UnresolvedFieldReference(_)) - - val renamings = fields.zip(curFields).map { - case (newName, oldName) => new Naming(oldName, newName.name) - } - val remaining = curFields.drop(fields.size) - - relBuilder.push(relNode) - - val exprs = (renamings ++ remaining).map(_.toRexNode(relBuilder)) - - new Table(createRenamingProject(exprs), tableEnv) + new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv)) } /** @@ -208,11 +146,7 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { - - relBuilder.push(relNode) - relBuilder.filter(predicate.toRexNode(relBuilder)) - - new Table(relBuilder.build(), tableEnv) + new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) } /** @@ -269,19 +203,10 @@ class Table( * }}} */ def groupBy(fields: Expression*): GroupedTable = { - - // group by on stream tables is currently not supported - tableEnv match { - case _: StreamTableEnvironment => - throw new TableException("Group by on stream tables is currently not supported.") - case _ => { - relBuilder.push(relNode) - val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava - val groupKey = relBuilder.groupKey(groupExpr) - - new GroupedTable(relBuilder.build(), tableEnv, groupKey) - } + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Group by on stream tables is currently not supported.") } + new GroupedTable(this, fields) } /** @@ -309,15 +234,7 @@ class Table( * }}} */ def distinct(): Table = { - // distinct on stream table is not currently supported - tableEnv match { - case _: StreamTableEnvironment => - throw new TableException("Distinct on stream tables is currently not supported.") - case _ => - relBuilder.push(relNode) - relBuilder.distinct() - new Table(relBuilder.build(), tableEnv) - } + new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv)) } /** @@ -334,32 +251,12 @@ class Table( * }}} */ def join(right: Table): Table = { - - // join on stream tables is currently not supported - tableEnv match { - case _: StreamTableEnvironment => - throw new TableException("Join on stream tables is currently not supported.") - case _ => { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new TableException("Only tables from the same TableEnvironment can be joined.") - } - - // check that join inputs do not have overlapping field names - val leftFields = relNode.getRowType.getFieldNames.asScala.toSet - val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet - if (leftFields.intersect(rightFields).nonEmpty) { - throw new IllegalArgumentException("Overlapping fields names on join input.") - } - - relBuilder.push(relNode) - relBuilder.push(right.relNode) - - relBuilder.join(JoinRelType.INNER, relBuilder.literal(true)) - val join = relBuilder.build() - new Table(join, tableEnv) - } + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be joined.") } + new Table(tableEnv, + Join(this.logicalPlan, right.logicalPlan, JoinType.INNER, None).validate(tableEnv)) } /** @@ -375,32 +272,11 @@ class Table( * }}} */ def unionAll(right: Table): Table = { - // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { - throw new TableException("Only tables from the same TableEnvironment can be unioned.") - } - - val leftRowType: List[RelDataTypeField] = relNode.getRowType.getFieldList.asScala.toList - val rightRowType: List[RelDataTypeField] = right.relNode.getRowType.getFieldList.asScala.toList - - if (leftRowType.length != rightRowType.length) { - throw new IllegalArgumentException("Unioned tables have varying row schema.") + throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } - else { - val zipped: List[(RelDataTypeField, RelDataTypeField)] = leftRowType.zip(rightRowType) - zipped.foreach { case (x, y) => - if (!x.getName.equals(y.getName) || x.getType != y.getType) { - throw new IllegalArgumentException("Unioned tables have varying row schema.") - } - } - } - - relBuilder.push(relNode) - relBuilder.push(right.relNode) - - relBuilder.union(true) - new Table(relBuilder.build(), tableEnv) + new Table(tableEnv, Union(logicalPlan, right.logicalPlan).validate(tableEnv)) } /** @@ -414,22 +290,13 @@ class Table( * }}} */ def orderBy(fields: Expression*): Table = { - relBuilder.push(relNode) - - if (! fields.forall { - case x : UnresolvedFieldReference => true - case x : Ordering => x.child.isInstanceOf[UnresolvedFieldReference] - case _ => false - }) { - throw new IllegalArgumentException("All expressions must be field references " + - "or asc/desc expressions.") + val order: Seq[Ordering] = fields.map { case e => + e match { + case o: Ordering => o + case _ => Asc(e) + } } - - val exprs = fields.map(_.toRexNode(relBuilder)) - - relBuilder.sort(exprs.asJava) - new Table(relBuilder.build(), tableEnv) - + new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv)) } /** @@ -460,7 +327,7 @@ class Table( def toSink[T](sink: TableSink[T]): Unit = { // get schema information of table - val rowType = relNode.getRowType + val rowType = getRelNode.getRowType val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala .map(f => TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray @@ -471,62 +338,14 @@ class Table( // emit the table to the configured table sink tableEnv.emitToSink(this, configuredSink) } - - private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { - - val names = exprs.map{ e => - e.getKind match { - case SqlKind.AS => - e.asInstanceOf[RexCall].getOperands.get(1) - .asInstanceOf[RexLiteral].getValue - .asInstanceOf[NlsString].getValue - case SqlKind.INPUT_REF => - relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex) - case _ => - throw new PlanGenException("Unexpected expression type encountered.") - } - - } - LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava) - } - - private def checkUniqueNames(exprs: Seq[Expression]): Unit = { - val names: mutable.Set[String] = mutable.Set() - - exprs.foreach { - case n: Naming => - // explicit name - if (names.contains(n.name)) { - throw new IllegalArgumentException(s"Duplicate field name $n.name.") - } else { - names.add(n.name) - } - case u: UnresolvedFieldReference => - // simple field forwarding - if (names.contains(u.name)) { - throw new IllegalArgumentException(s"Duplicate field name $u.name.") - } else { - names.add(u.name) - } - case _ => // Do nothing - } - } - } /** * A table that has been grouped on a set of grouping keys. - * - * @param relNode The root node of the relational Calcite [[RelNode]] tree. - * @param tableEnv The [[TableEnvironment]] to which the table is bound. - * @param groupKey The Calcite [[GroupKey]] of this table. */ class GroupedTable( - private[flink] override val relNode: RelNode, - private[flink] override val tableEnv: TableEnvironment, - private[flink] val groupKey: GroupKey) extends BaseTable(relNode, tableEnv) { - - def relBuilder = tableEnv.getRelBuilder + private[flink] val table: Table, + private[flink] val groupKey: Seq[Expression]) { /** * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. @@ -540,31 +359,19 @@ class GroupedTable( */ def select(fields: Expression*): Table = { - relBuilder.push(relNode) - - // separate aggregations and selection expressions - val extractedAggCalls: List[(Expression, List[AggCall])] = fields - .map(extractAggCalls(_, tableEnv)).toList - - // get aggregation calls - val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2) + val projectionOnAggregates = fields.map(extractAggregations(_, table.tableEnv)) + val aggregations = projectionOnAggregates.flatMap(_._2) - // apply aggregations - relBuilder.aggregate(groupKey, aggCalls.toIterable.asJava) - - // get selection expressions - val exprs: List[RexNode] = try { - extractedAggCalls.map(_._1.toRexNode(relBuilder)) - } catch { - case iae: IllegalArgumentException => - throw new IllegalArgumentException( - "Only grouping fields and aggregations allowed after groupBy.", iae) - case e: Exception => throw e + val logical = if (aggregations.nonEmpty) { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + Aggregate(groupKey, aggregations, table.logicalPlan).validate(table.tableEnv) + ) + } else { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + Aggregate(groupKey, Nil, table.logicalPlan).validate(table.tableEnv)) } - relBuilder.project(exprs.toIterable.asJava) - - new Table(relBuilder.build(), tableEnv) + new Table(table.tableEnv, logical.validate(table.tableEnv)) } /** @@ -581,5 +388,4 @@ class GroupedTable( val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } - } http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala new file mode 100644 index 0000000..63c7013 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +import org.apache.commons.lang.ClassUtils + +/** + * Generic base class for trees that can be transformed and traversed. + */ +abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A => + + /** + * List of child nodes that should be considered when doing transformations. Other values + * in the Product will not be transformed, only handed through. + */ + def children: Seq[A] + + /** + * Tests for equality by first testing for reference equality. + */ + def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other + + /** + * Do tree transformation in post order. + */ + def postOrderTransform(rule: PartialFunction[A, A]): A = { + def childrenTransform(rule: PartialFunction[A, A]): A = { + var changed = false + val newArgs = productIterator.map { + case arg: TreeNode[_] if children.contains(arg) => + val newChild = arg.asInstanceOf[A].postOrderTransform(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case args: Traversable[_] => args.map { + case arg: TreeNode[_] if children.contains(arg) => + val newChild = arg.asInstanceOf[A].postOrderTransform(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case other => other + } + case nonChild: AnyRef => nonChild + case null => null + }.toArray + if (changed) makeCopy(newArgs) else this + } + + val afterChildren = childrenTransform(rule) + if (afterChildren fastEquals this) { + rule.applyOrElse(this, identity[A]) + } else { + rule.applyOrElse(afterChildren, identity[A]) + } + } + + /** + * Runs the given function first on the node and then recursively on all its children. + */ + def preOrderVisit(f: A => Unit): Unit = { + f(this) + children.foreach(_.preOrderVisit(f)) + } + + /** + * Creates a new copy of this expression with new children. This is used during transformation + * if children change. + */ + def makeCopy(newArgs: Array[AnyRef]): A = { + val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0) + if (ctors.isEmpty) { + throw new RuntimeException(s"No valid constructor for ${getClass.getSimpleName}") + } + + val defaultCtor = ctors.find { ctor => + if (ctor.getParameterTypes.size != newArgs.length) { + false + } else if (newArgs.contains(null)) { + false + } else { + val argsClasses: Array[Class[_]] = newArgs.map(_.getClass) + ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes) + } + }.getOrElse(ctors.maxBy(_.getParameterTypes.size)) + + try { + defaultCtor.newInstance(newArgs: _*).asInstanceOf[A] + } catch { + case e: java.lang.IllegalArgumentException => + throw new IllegalArgumentException(s"Fail to copy treeNode ${getClass.getName}") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala new file mode 100644 index 0000000..1da1d2c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeutils + +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} +import org.apache.flink.api.table.validate._ + +object TypeCheckUtils { + + def assertNumericExpr(dataType: TypeInformation[_], caller: String): ExprValidationResult = { + if (dataType.isInstanceOf[NumericTypeInfo[_]]) { + ValidationSuccess + } else { + ValidationFailure(s"$caller requires numeric types, get $dataType here") + } + } + + def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ExprValidationResult = { + if (dataType.isSortKeyType) { + ValidationSuccess + } else { + ValidationFailure(s"$caller requires orderable types, get $dataType here") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala new file mode 100644 index 0000000..218996d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} + +/** + * Utilities for type conversions. + */ +object TypeCoercion { + + val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] = + IndexedSeq( + BYTE_TYPE_INFO, + SHORT_TYPE_INFO, + INT_TYPE_INFO, + LONG_TYPE_INFO, + FLOAT_TYPE_INFO, + DOUBLE_TYPE_INFO) + + def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): Option[TypeInformation[_]] = { + (tp1, tp2) match { + case (tp1, tp2) if tp1 == tp2 => Some(tp1) + + case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO) + case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO) + + case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) => + val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2) + Some(numericWideningPrecedence(higherIndex)) + + case _ => None + } + } + + /** + * Test if we can do cast safely without lose of information. + */ + def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match { + case (_, STRING_TYPE_INFO) => true + + case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) => + if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) { + true + } else { + false + } + + case _ => false + } + + /** + * All the supported cast types in flink-table. + * Note: This may lose information during the cast. + */ + def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match { + case (from, to) if from == to => true + + case (_, STRING_TYPE_INFO) => true + + case (_, DATE_TYPE_INFO) => false // Date type not supported yet. + case (_, VOID_TYPE_INFO) => false // Void type not supported + case (_, CHAR_TYPE_INFO) => false // Character type not supported. + + case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true + case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true + + case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true + case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true + + case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true + + case _ => false + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala new file mode 100644 index 0000000..8571051 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ExprValidationResult.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.validate + +/** + * Represents the result of `Expression.validateInput`. + */ +sealed trait ExprValidationResult { + def isFailure: Boolean = !isSuccess + def isSuccess: Boolean +} + +/** + * Represents the successful result of `Expression.checkInputDataTypes`. + */ +object ValidationSuccess extends ExprValidationResult { + val isSuccess: Boolean = true +} + +/** + * Represents the failing result of `Expression.checkInputDataTypes`, + * with a error message to show the reason of failure. + */ +case class ValidationFailure(message: String) extends ExprValidationResult { + val isSuccess: Boolean = false +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala new file mode 100644 index 0000000..726917e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.validate + +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.ValidationException + +/** + * A catalog for looking up user defined functions, used during validation phase. + */ +class FunctionCatalog { + + private val functionBuilders = mutable.HashMap.empty[String, Class[_]] + + def registerFunction(name: String, builder: Class[_]): Unit = + functionBuilders.put(name.toLowerCase, builder) + + /** + * Lookup and create an expression if we find a match. + */ + def lookupFunction(name: String, children: Seq[Expression]): Expression = { + val funcClass = functionBuilders.get(name.toLowerCase).getOrElse { + throw new ValidationException(s"undefined function $name") + } + withChildren(funcClass, children) + } + + /** + * Instantiate a function using the provided `children`. + */ + private def withChildren(func: Class[_], children: Seq[Expression]): Expression = { + // Try to find a constructor accepts `Seq[Expression]` + Try(func.getDeclaredConstructor(classOf[Seq[_]])) match { + case Success(seqCtor) => + Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match { + case Success(expr) => expr + case Failure(e) => throw new ValidationException(e.getMessage) + } + case Failure(e) => + val childrenClass = Seq.fill(children.length)(classOf[Expression]) + // Try to find a constructor matching the exact number of children + Try(func.getDeclaredConstructor(childrenClass: _*)) match { + case Success(ctor) => + Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match { + case Success(expr) => expr + case Failure(e) => throw new ValidationException(e.getMessage) + } + case Failure(e) => + throw new ValidationException(s"Invalid number of arguments for function $func") + } + } + } + + /** + * Drop a function and return if the function existed. + */ + def dropFunction(name: String): Boolean = + functionBuilders.remove(name.toLowerCase).isDefined + + /** + * Drop all registered functions. + */ + def clear(): Unit = functionBuilders.clear() +} + +object FunctionCatalog { + + val buildInFunctions: Map[String, Class[_]] = Map( + // aggregate functions + "avg" -> classOf[Avg], + "count" -> classOf[Count], + "max" -> classOf[Max], + "min" -> classOf[Min], + "sum" -> classOf[Sum], + + // string functions + "charLength" -> classOf[CharLength], + "initCap" -> classOf[InitCap], + "like" -> classOf[Like], + "lowerCase" -> classOf[Lower], + "similar" -> classOf[Similar], + "subString" -> classOf[SubString], + "trim" -> classOf[Trim], + "upperCase" -> classOf[Upper], + + // math functions + "abs" -> classOf[Abs], + "ceil" -> classOf[Ceil], + "exp" -> classOf[Exp], + "floor" -> classOf[Floor], + "log10" -> classOf[Log10], + "ln" -> classOf[Ln], + "power" -> classOf[Power], + "mod" -> classOf[Mod] + ) + + /** + * Create a new function catalog with build-in functions. + */ + def withBuildIns: FunctionCatalog = { + val catalog = new FunctionCatalog() + buildInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) } + catalog + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java index 0a2cf57..8fdb2da 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java @@ -24,10 +24,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.TableException; +import org.apache.flink.api.table.*; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Test; import org.junit.runner.RunWith; @@ -99,7 +96,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase { tableEnv.registerDataSet("MyTable", ds2); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testScanUnregisteredTable() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); @@ -127,7 +124,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testIllegalName() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); @@ -138,7 +135,7 @@ public class TableEnvironmentITCase extends TableProgramsTestBase { tableEnv.registerTable("_DataSetTable_42", t); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testRegisterTableFromOtherEnv() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java index d4af08d..ddf5884 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.plan.PlanGenException; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -59,7 +59,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testAggregationOnNonExistingField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -141,7 +141,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = PlanGenException.class) + @Test(expected = ValidationException.class) public void testNonWorkingDataTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -161,7 +161,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = UnsupportedOperationException.class) + @Test(expected = ValidationException.class) public void testNoNestedAggregation() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java index 0246904..83c7cfd 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java @@ -30,6 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.TableEnvironment; import org.apache.flink.api.table.codegen.CodeGenException; import static org.junit.Assert.fail; + +import org.apache.flink.api.table.ValidationException; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -154,7 +156,7 @@ public class ExpressionsITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testEvalInvalidTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java index 2848347..7a2bedf 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Test; import org.junit.runner.RunWith; @@ -152,7 +153,7 @@ public class FilterITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testFilterInvalidField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java index 70dd793..1906040 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -40,7 +41,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testGroupingOnNonExistentField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -54,7 +55,7 @@ public class GroupedAggregationsITCase extends MultipleProgramsTestBase { .select("a.avg"); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testGroupingInvalidSelection() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java index 4c40596..e6db3b0 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.TableException; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -121,7 +121,7 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testJoinNonExistingKey() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -136,7 +136,7 @@ public class JoinITCase extends MultipleProgramsTestBase { in1.join(in2).where("foo === e").select("c, g"); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testJoinWithNonMatchingKeyTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -154,7 +154,7 @@ public class JoinITCase extends MultipleProgramsTestBase { tableEnv.toDataSet(result, Row.class).collect(); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testJoinWithAmbiguousFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -189,7 +189,7 @@ public class JoinITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testJoinTablesFromDifferentEnvs() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java index 7c9478a..e48914c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Test; import org.junit.runner.RunWith; @@ -103,7 +104,7 @@ public class SelectITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testSelectInvalidField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); @@ -115,7 +116,7 @@ public class SelectITCase extends TableProgramsTestBase { .select("a + 1, foo + 2"); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testSelectAmbiguousFieldNames() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java index e55bd22..db5eac9 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java @@ -21,10 +21,10 @@ package org.apache.flink.api.java.batch.table; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.codegen.CodeGenException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.BatchTableEnvironment; @@ -64,6 +64,26 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { } @Test + public void testSubstringWithByteStart() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataSet<Tuple2<String, Byte>> ds = env.fromElements( + new Tuple2<>("AAAA", (byte) 2), + new Tuple2<>("BBBB", (byte) 1)); + + Table in = tableEnv.fromDataSet(ds, "a, b"); + + Table result = in + .select("a.substring(1, b)"); + + DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); + List<Row> results = resultSet.collect(); + String expected = "AA\nB"; + compareResultAsText(results, expected); + } + + @Test public void testSubstringWithMaxEnd() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -83,7 +103,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = CodeGenException.class) + @Test(expected = ValidationException.class) public void testNonWorkingSubstring1() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -102,7 +122,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { resultSet.collect(); } - @Test(expected = CodeGenException.class) + @Test(expected = ValidationException.class) public void testNonWorkingSubstring2() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -121,7 +141,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { resultSet.collect(); } - @Test(expected = CodeGenException.class) + @Test(expected = ValidationException.class) public void testGeneratedCodeForStringComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -132,7 +152,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class); } - @Test(expected = CodeGenException.class) + @Test(expected = ValidationException.class) public void testGeneratedCodeForIntegerEqualsComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -143,7 +163,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class); } - @Test(expected = CodeGenException.class) + @Test(expected = ValidationException.class) public void testGeneratedCodeForIntegerGreaterComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java index a7805f8..853cd7f 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.TableException; +import org.apache.flink.api.table.ValidationException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -80,7 +80,7 @@ public class UnionITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testUnionIncompatibleNumberOfFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -95,7 +95,7 @@ public class UnionITCase extends MultipleProgramsTestBase { in1.unionAll(in2); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testUnionIncompatibleFieldsName() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -110,7 +110,7 @@ public class UnionITCase extends MultipleProgramsTestBase { in1.unionAll(in2); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = ValidationException.class) public void testUnionIncompatibleFieldTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); @@ -168,7 +168,7 @@ public class UnionITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = TableException.class) + @Test(expected = ValidationException.class) public void testUnionTablesFromDifferentEnvs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala index 80a491b..c33e1ef 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -89,7 +89,7 @@ class TableEnvironmentITCase( tEnv.registerDataSet("MyTable", ds2) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testScanUnregisteredTable(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -130,7 +130,7 @@ class TableEnvironmentITCase( tEnv.registerDataSet("MyTable", t2) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testRegisterTableFromOtherEnv(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv1 = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala index c34edbc..6c413e5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala @@ -21,8 +21,7 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.plan.PlanGenException -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} @@ -49,7 +48,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testAggregationOnNonExistingField(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -137,7 +136,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(result.asJava, expected) } - @Test(expected = classOf[PlanGenException]) + @Test(expected = classOf[ValidationException]) def testNonWorkingAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -150,7 +149,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa t.collect() } - @Test(expected = classOf[UnsupportedOperationException]) + @Test(expected = classOf[ValidationException]) def testNoNestedAggregations(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala index fdbe3c9..9a0a035 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.codegen.CodeGenException import org.apache.flink.api.table.expressions.{Literal, Null} -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit.Assert._ @@ -144,7 +144,7 @@ class ExpressionsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testEvalInvalidTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala index 5deff9e..ee0356f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigM import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -174,7 +174,7 @@ class FilterITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testFilterInvalidFieldName(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/f0d543f8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala index cbd42b2..8889b37 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testGroupingOnNonExistentField(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -45,7 +45,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram .select('a.avg) } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[ValidationException]) def testGroupingInvalidSelection(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment