[FLINK-3223] Translate Table API calls to Calcite RelNodes. This is an intermediate step to port the Table API on top of Calcite (FLINK-3221). This commit: - Adds Calcite as dependency to flink-table. - Translates Table API calls directly into Calcite RelNodes. - Modifies tests to check only the translation into logical plans but not the execution of Table API queries. - Deactivates a few tests that are not supported yet. - Removes a lot of the former Table API translation code. - Removes bitwise operators from the Table API.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4ee0c6d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4ee0c6d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4ee0c6d Branch: refs/heads/tableOnCalcite Commit: e4ee0c6d7393b1d555b08c8f515cf9724d25c7fb Parents: a65cd8d Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Jan 15 17:46:39 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat Jan 23 10:45:58 2016 +0100 ---------------------------------------------------------------------- flink-libraries/flink-table/pom.xml | 6 + .../api/java/table/JavaBatchTranslator.scala | 339 ++----------------- .../java/table/JavaStreamingTranslator.scala | 241 ------------- .../flink/api/java/table/TableEnvironment.scala | 43 +-- .../api/scala/table/DataStreamConversions.scala | 68 ---- .../api/scala/table/ScalaBatchTranslator.scala | 26 +- .../scala/table/ScalaStreamingTranslator.scala | 58 ---- .../api/scala/table/TableConversions.scala | 12 +- .../flink/api/scala/table/expressionDsl.scala | 5 - .../apache/flink/api/scala/table/package.scala | 21 +- .../org/apache/flink/api/table/Table.scala | 236 +++++++++---- .../table/codegen/ExpressionCodeGenerator.scala | 35 -- .../api/table/expressions/aggregations.scala | 40 +-- .../analysis/ExtractEquiJoinFields.scala | 70 ---- .../expressions/analysis/GroupByAnalyzer.scala | 51 --- .../expressions/analysis/InsertAutoCasts.scala | 92 ----- .../analysis/PredicateAnalyzer.scala | 35 -- .../analysis/ResolveFieldReferences.scala | 60 ---- .../analysis/SelectionAnalyzer.scala | 36 -- .../table/expressions/analysis/TypeCheck.scala | 57 ---- .../expressions/analysis/VerifyBoolean.scala | 41 --- .../analysis/VerifyNoAggregates.scala | 53 --- .../analysis/VerifyNoNestedAggregates.scala | 54 --- .../api/table/expressions/arithmetic.scala | 56 +-- .../flink/api/table/expressions/cast.scala | 5 + .../api/table/expressions/fieldExpression.scala | 5 + .../api/table/parser/ExpressionParser.scala | 16 +- .../api/table/plan/ExpandAggregations.scala | 147 -------- .../flink/api/table/plan/PlanTranslator.scala | 133 ++------ .../api/table/plan/RexNodeTranslator.scala | 184 ++++++++++ .../flink/api/table/plan/TypeConverter.scala | 54 +++ .../flink/api/table/plan/operations.scala | 134 -------- .../api/table/plan/operators/DataSetTable.scala | 66 ++++ .../apache/flink/api/table/plan/package.scala | 24 -- .../apache/flink/api/table/trees/Analyzer.scala | 43 --- .../org/apache/flink/api/table/trees/Rule.scala | 30 -- .../examples/scala/StreamingTableFilter.scala | 90 ----- .../api/java/table/test/AggregationsITCase.java | 67 ++-- .../flink/api/java/table/test/AsITCase.java | 68 ++-- .../api/java/table/test/CastingITCase.java | 62 ++-- .../api/java/table/test/ExpressionsITCase.java | 83 +---- .../flink/api/java/table/test/FilterITCase.java | 58 ++-- .../table/test/GroupedAggregationsITCase.java | 35 +- .../flink/api/java/table/test/JoinITCase.java | 70 ++-- .../api/java/table/test/PojoGroupingITCase.java | 18 +- .../flink/api/java/table/test/SelectITCase.java | 70 ++-- .../api/java/table/test/SqlExplainITCase.java | 7 + .../table/test/StringExpressionsITCase.java | 45 +-- .../flink/api/java/table/test/UnionITCase.java | 44 +-- .../scala/table/test/PageRankTableITCase.java | 7 +- .../scala/table/test/TypeExceptionTest.scala | 42 --- .../scala/table/test/AggregationsITCase.scala | 104 +++--- .../flink/api/scala/table/test/AsITCase.scala | 74 ++-- .../api/scala/table/test/CastingITCase.scala | 57 ++-- .../scala/table/test/ExpressionsITCase.scala | 90 ++--- .../api/scala/table/test/FilterITCase.scala | 59 ++-- .../table/test/GroupedAggreagationsITCase.scala | 115 ------- .../table/test/GroupedAggregationsITCase.scala | 97 ++++++ .../flink/api/scala/table/test/JoinITCase.scala | 73 ++-- .../api/scala/table/test/SelectITCase.scala | 99 +++--- .../api/scala/table/test/SqlExplainITCase.scala | 198 +++++------ .../table/test/StringExpressionsITCase.scala | 52 +-- .../api/scala/table/test/UnionITCase.scala | 34 +- 63 files changed, 1389 insertions(+), 3005 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 19951cb..818b537 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -100,6 +100,12 @@ under the License. <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>1.5.0</version> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 9dc9297..f3f4e9d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -18,22 +18,14 @@ package org.apache.flink.api.java.table -import java.lang.reflect.Modifier - -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode +import org.apache.calcite.tools.{RelBuilder, Frameworks} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.api.java.operators.JoinOperator.EquiJoin -import org.apache.flink.api.java.operators.Keys.ExpressionKeys -import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.runtime._ -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.table.{ExpressionException, Row, Table} +import org.apache.flink.api.table.plan.operators.DataSetTable +import org.apache.flink.api.table.Table /** * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and @@ -45,302 +37,41 @@ class JavaBatchTranslator extends PlanTranslator { override def createTable[A]( repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val rowDataSet = createSelect(expressions, repr, inputType) - - Table(Root(rowDataSet, resultFields)) + fieldNames: Array[String]): Table = { + + // create table representation from DataSet + val dataSetTable = new DataSetTable[A]( + repr.asInstanceOf[JavaDataSet[A]], + fieldNames + ) + + // register table in Cascading schema + val schema = Frameworks.createRootSchema(true) + val tableName = repr.hashCode().toString + schema.add(tableName, dataSetTable) + + // initialize RelBuilder + val frameworkConfig = Frameworks + .newConfigBuilder + .defaultSchema(schema) + .build + val relBuilder = RelBuilder.create(frameworkConfig) + + // create table scan operator + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) } - override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[JavaDataSet[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataSet of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "A Table can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] + override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] + println(RelOptUtil.toString(lPlan)) - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq + // TODO: optimize & translate: + // - optimize RelNode plan + // - translate to Flink RelNode plan + // - generate DataSet program - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same " + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - val operator = new MapOperator(resultSet, outputType, function, opName) - - operator + null } - private def translateInternal(op: PlanNode): JavaDataSet[Row] = { - op match { - case Root(dataSet: JavaDataSet[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op + ". " + - "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) - new RenameOperator(translatedInput, proxyType) - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedInput = input match { - case GroupBy(groupByInput, groupExpressions) => - val translatedGroupByInput = translateInternal(groupByInput) - val inType = translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]] - - val keyIndices = groupExpressions map { - case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) - case e => - throw new ExpressionException(s"Expression $e is not a valid key expression.") - } - - val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) - val grouping = new UnsortedGrouping(translatedGroupByInput, keys) - - new GroupReduceOperator( - grouping, - inType, - new NoExpressionAggregateFunction(), - "Nop Expression Aggregation") - - case _ => translateInternal(input) - } - - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val keyIndices = groupExpressions map { - case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) - case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") - } - - val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) - - val grouping = new UnsortedGrouping(translatedInput, keys) - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - grouping, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - case agg@Aggregate(input, aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - translatedInput, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter).name(predicate.toString) - - case uni@UnionAll(left, right) => - val translatedLeft = translateInternal(left) - val translatedRight = translateInternal(right) - translatedLeft.union(translatedRight).name("Union: " + uni) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: JavaDataSet[I], - inputType: CompositeType[I]): JavaDataSet[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - val operator = new MapOperator(input, resultType, function, opName) - - operator - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: JavaDataSet[L], - rightInput: JavaDataSet[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): JavaDataSet[Row] = { - - val resultType = new RowTypeInfo(fields) - - val (reducedPredicate, leftFields, rightFields) = - ExtractEquiJoinFields(leftType, rightType, predicate) - - if (leftFields.isEmpty || rightFields.isEmpty) { - throw new ExpressionException("Could not derive equi-join predicates " + - "for predicate " + predicate + ".") - } - - val leftKey = new ExpressionKeys[L](leftFields, leftType) - val rightKey = new ExpressionKeys[R](rightFields, rightType) - - val joiner = new ExpressionJoinFunction[L, R, Row]( - reducedPredicate, - leftType, - rightType, - resultType, - fields) - - new EquiJoin[L, R, Row]( - leftInput, - rightInput, - leftKey, - rightKey, - joiner, - resultType, - joinHint, - predicate.toString) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala deleted file mode 100644 index a37c892..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table - -import java.lang.reflect.Modifier -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{ExpressionException, Row, Table} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.operators.StreamMap - -/** - * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and - * translating them back to Java [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ - -class JavaStreamingTranslator extends PlanTranslator { - - type Representation[A] = DataStream[A] - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val rowDataStream = createSelect(expressions, repr, inputType) - - new Table(Root(rowDataStream, resultFields)) - } - - override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[DataStream[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataStream of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "A Table can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] - - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] - - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq - - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same" + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - - resultSet.transform(opName, outputType, new StreamMap[Row, A](function)) - } - - private def translateInternal(op: PlanNode): DataStream[Row] = { - op match { - case Root(dataSet: DataStream[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op + ". " + - "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - throw new ExpressionException("As operation for Streams not yet implemented.") - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - // no expansions took place - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case agg@Aggregate(input, aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter) - - case UnionAll(left, right) => - val translatedLeft = translateInternal(left) - val translatedRight = translateInternal(right) - translatedLeft.union(translatedRight) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: DataStream[I], - inputType: CompositeType[I]): DataStream[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - - input.transform(opName, resultType, new StreamMap[I, Row](function)) - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: DataStream[L], - rightInput: DataStream[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): DataStream[Row] = { - - throw new ExpressionException("Join operation for Streams not yet implemented.") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala index 5614031..01e38db 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala @@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.table.Table -import org.apache.flink.streaming.api.datastream.DataStream /** * Environment for working with the Table API. * - * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] and back again. You + * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You * can also use the provided methods to create a [[Table]] directly from a data source. */ class TableEnvironment { @@ -58,32 +57,6 @@ class TableEnvironment { } /** - * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataStream type are renamed to the given set of fields: - * - * Example: - * - * {{{ - * tableEnv.fromDataStream(set, "a, b") - * }}} - * - * This will transform the set containing elements of two fields to a table where the fields - * are named a and b. - */ - def fromDataStream[T](set: DataStream[T], fields: String): Table = { - new JavaStreamingTranslator().createTable(set, fields) - } - - /** - * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataStream type are used to name the - * [[org.apache.flink.api.table.Table]] fields. - */ - def fromDataStream[T](set: DataStream[T]): Table = { - new JavaStreamingTranslator().createTable(set) - } - - /** * Converts the given [[org.apache.flink.api.table.Table]] to * a DataSet. The given type must have exactly the same fields as the * [[org.apache.flink.api.table.Table]]. That is, the names of the @@ -91,21 +64,9 @@ class TableEnvironment { */ @SuppressWarnings(Array("unchecked")) def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = { - new JavaBatchTranslator().translate[T](table.operation)( + new JavaBatchTranslator().translate[T](table.relNode)( TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) } - /** - * Converts the given [[org.apache.flink.api.table.Table]] to - * a DataStream. The given type must have exactly the same fields as the - * [[org.apache.flink.api.table.Table]]. That is, the names of the - * fields and the types must match. - */ - @SuppressWarnings(Array("unchecked")) - def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - new JavaStreamingTranslator().translate[T](table.operation)( - TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) - - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala deleted file mode 100644 index 47bd100..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table._ -import org.apache.flink.api.table.expressions.{Expression, UnresolvedFieldReference} -import org.apache.flink.streaming.api.scala.DataStream - -class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) { - - /** - * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.as('a, 'b) - * }}} - * - * This results in a [[Table]] that has field `a` of type `String` and field `b` - * of type `Int`. - */ - - def as(fields: Expression*): Table = { - new ScalaStreamingTranslator().createTable( - stream, - fields.toArray, - checkDeterministicFields = true) - } - - /** - * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field - * names of the input type. - * - * Example: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.toTable - * }}} - * - * This results in a [[Table]] that has field `_1` of type `String` and field `_2` - * of type `Int`. - */ - - def toTable: Table = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala index cdcf53e..1c453fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala @@ -18,10 +18,8 @@ package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.calcite.rel.RelNode import org.apache.flink.api.java.table.JavaBatchTranslator -import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.scala.wrap import org.apache.flink.api.table.plan._ import org.apache.flink.api.table.Table @@ -30,7 +28,6 @@ import org.apache.flink.api.scala.DataSet import scala.reflect.ClassTag - /** * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and * translating them back to Scala [[DataSet]]s. @@ -41,28 +38,13 @@ class ScalaBatchTranslator extends PlanTranslator { type Representation[A] = DataSet[A] - def createTable[A]( - repr: DataSet[A], - fields: Array[Expression]): Table = { - - val result = javaTranslator.createTable(repr.javaSet, fields) - - new Table(result.operation) + override def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table = { + javaTranslator.createTable(repr.javaSet, fieldNames) } - override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { + override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { // fake it till you make it ... wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) } - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields) - - Table(result.operation) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala deleted file mode 100644 index 88f1b83..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.table.JavaStreamingTranslator -import org.apache.flink.api.table.Table -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} - -/** - * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and - * translating them back to Scala [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ -class ScalaStreamingTranslator extends PlanTranslator { - - private val javaTranslator = new JavaStreamingTranslator - - override type Representation[A] = DataStream[A] - - override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = { - // fake it till you make it ... - javaToScalaStream(javaTranslator.translate(op)) - } - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val result = - javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) - - new Table(result.operation) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala index 4f2172e..fdcd804 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala @@ -22,10 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.api.table._ -import org.apache.flink.streaming.api.scala.DataStream - /** - * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A [[Table]] is + * Methods for converting a [[Table]] to a [[DataSet]]. A [[Table]] is * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. */ class TableConversions(table: Table) { @@ -34,14 +32,8 @@ class TableConversions(table: Table) { * Converts the [[Table]] to a [[DataSet]]. */ def toDataSet[T: TypeInformation]: DataSet[T] = { - new ScalaBatchTranslator().translate[T](table.operation) + new ScalaBatchTranslator().translate[T](table.relNode) } - /** - * Converts the [[Table]] to a [[DataStream]]. - */ - def toDataStream[T: TypeInformation]: DataStream[T] = { - new ScalaStreamingTranslator().translate[T](table.operation) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 0be6be2..058ff0e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -55,11 +55,6 @@ trait ImplicitExpressionOperations { def * (other: Expression) = Mul(expr, other) def % (other: Expression) = Mod(expr, other) - def & (other: Expression) = BitwiseAnd(expr, other) - def | (other: Expression) = BitwiseOr(expr, other) - def ^ (other: Expression) = BitwiseXor(expr, other) - def unary_~ = BitwiseNot(expr) - def abs = Abs(expr) def sum = Sum(expr) http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index e74651b..86bb7c0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -19,7 +19,6 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.table.{Row, Table} -import org.apache.flink.streaming.api.scala.DataStream import scala.language.implicitConversions @@ -32,7 +31,7 @@ import scala.language.implicitConversions * import org.apache.flink.api.scala.table._ * }}} * - * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to a + * imports implicit conversions for converting a [[DataSet]] to a * [[Table]]. This can be used to perform SQL-like queries on data. Please have * a look at [[Table]] to see which operations are supported and * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an @@ -78,7 +77,7 @@ package object table extends ImplicitExpressionConversions { implicit def table2RowDataSet( table: Table): DataSet[Row] = { - new ScalaBatchTranslator().translate[Row](table.operation) + new ScalaBatchTranslator().translate[Row](table.relNode) } implicit def rowDataSet2Table( @@ -86,20 +85,4 @@ package object table extends ImplicitExpressionConversions { rowDataSet.toTable } - implicit def dataStream2DataSetConversions[T]( - stream: DataStream[T]): DataStreamConversions[T] = { - new DataStreamConversions[T]( - stream, - stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) - } - - implicit def table2RowDataStream( - table: Table): DataStream[Row] = { - new ScalaStreamingTranslator().translate[Row](table.operation) - } - - implicit def rowDataStream2Table( - rowDataStream: DataStream[Row]): Table = { - rowDataStream.toTable - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala index 641f2fa..271aa99 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala @@ -17,14 +17,22 @@ */ package org.apache.flink.api.table -import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.table.explain.PlanJsonParser -import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer} -import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeField +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} +import org.apache.flink.api.table.plan.RexNodeTranslator +import RexNodeTranslator.{toRexNode, extractAggCalls} +import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.parser.ExpressionParser -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ + +import scala.collection.JavaConverters._ + +case class BaseTable( + private[flink] val relNode: RelNode, + private[flink] val relBuilder: RelBuilder) /** * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs @@ -50,7 +58,11 @@ import org.apache.flink.api.scala.table._ * in a Scala DSL or as an expression String. Please refer to the documentation for the expression * syntax. */ -case class Table(private[flink] val operation: PlanNode) { +class Table( + private[flink] override val relNode: RelNode, + private[flink] override val relBuilder: RelBuilder) + extends BaseTable(relNode, relBuilder) +{ /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions @@ -63,14 +75,30 @@ case class Table(private[flink] val operation: PlanNode) { * }}} */ def select(fields: Expression*): Table = { - val analyzer = new SelectionAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - val fieldNames = analyzedFields map(_.name) - if (fieldNames.toSet.size != fieldNames.size) { - throw new ExpressionException(s"Resulting fields names are not unique in expression" + - s""" "${fields.mkString(", ")}".""") + + relBuilder.push(relNode) + + // separate aggregations and selection expressions + val extractedAggCalls: List[(Expression, List[AggCall])] = fields + .map(extractAggCalls(_, relBuilder)).toList + + // get aggregation calls + val aggCalls: List[AggCall] = extractedAggCalls + .map(_._2).reduce( (x,y) => x ::: y) + + // apply aggregations + if (aggCalls.nonEmpty) { + val emptyKey: GroupKey = relBuilder.groupKey() + relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava) } - this.copy(operation = Select(operation, analyzedFields)) + + // get selection expressions + val exprs: List[RexNode] = extractedAggCalls + .map(_._1) + .map(toRexNode(_, relBuilder)) + + relBuilder.project(exprs.toIterable.asJava) + new Table(relBuilder.build(), relBuilder) } /** @@ -99,13 +127,12 @@ case class Table(private[flink] val operation: PlanNode) { * }}} */ def as(fields: Expression*): Table = { - fields forall { - f => f.isInstanceOf[UnresolvedFieldReference] - } match { - case true => - case false => throw new ExpressionException("Only field expression allowed in as().") - } - this.copy(operation = As(operation, fields.toArray map { _.name })) + + relBuilder.push(relNode) + val expressions = fields.map(toRexNode(_, relBuilder)).toIterable.asJava + val names = fields.map(_.name).toIterable.asJava + relBuilder.project(expressions, names) + new Table(relBuilder.build(), relBuilder) } /** @@ -134,9 +161,11 @@ case class Table(private[flink] val operation: PlanNode) { * }}} */ def filter(predicate: Expression): Table = { - val analyzer = new PredicateAnalyzer(operation.outputFields) - val analyzedPredicate = analyzer.analyze(predicate) - this.copy(operation = Filter(operation, analyzedPredicate)) + + relBuilder.push(relNode) + val pred = toRexNode(predicate, relBuilder) + relBuilder.filter(pred) + new Table(relBuilder.build(), relBuilder) } /** @@ -192,20 +221,13 @@ case class Table(private[flink] val operation: PlanNode) { * in.groupBy('key).select('key, 'value.avg) * }}} */ - def groupBy(fields: Expression*): Table = { - val analyzer = new GroupByAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) + def groupBy(fields: Expression*): GroupedTable = { - val illegalKeys = analyzedFields filter { - case fe: ResolvedFieldReference => false // OK - case e => true - } + relBuilder.push(relNode) + val groupExpr = fields.map(toRexNode(_, relBuilder)).toIterable.asJava + val groupKey = relBuilder.groupKey(groupExpr) - if (illegalKeys.nonEmpty) { - throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) - } - - this.copy(operation = GroupBy(operation, analyzedFields)) + new GroupedTable(relBuilder.build(), relBuilder, groupKey) } /** @@ -218,7 +240,7 @@ case class Table(private[flink] val operation: PlanNode) { * in.groupBy("key").select("key, value.avg") * }}} */ - def groupBy(fields: String): Table = { + def groupBy(fields: String): GroupedTable = { val fieldsExpr = ExpressionParser.parseExpressionList(fields) groupBy(fieldsExpr: _*) } @@ -235,16 +257,21 @@ case class Table(private[flink] val operation: PlanNode) { * }}} */ def join(right: Table): Table = { - val leftInputNames = operation.outputFields.map(_._1).toSet - val rightInputNames = right.operation.outputFields.map(_._1).toSet - if (leftInputNames.intersect(rightInputNames).nonEmpty) { - throw new ExpressionException( - "Overlapping fields names on join input, result would be ambiguous: " + - operation.outputFields.mkString(", ") + - " and " + - right.operation.outputFields.mkString(", ") ) + + // check that join inputs do not have overlapping field names + val leftFields = relNode.getRowType.getFieldNames.asScala.toSet + val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet + if (leftFields.intersect(rightFields).nonEmpty) { + throw new IllegalArgumentException("Overlapping fields names on join input.") } - this.copy(operation = Join(operation, right.operation)) + + relBuilder.push(relNode) + relBuilder.push(right.relNode) + + relBuilder.join(JoinRelType.INNER, relBuilder.literal(true)) + val join = relBuilder.build() + val rowT = join.getRowType() + new Table(join, relBuilder) } /** @@ -258,17 +285,27 @@ case class Table(private[flink] val operation: PlanNode) { * }}} */ def unionAll(right: Table): Table = { - val leftInputFields = operation.outputFields - val rightInputFields = right.operation.outputFields - if (!leftInputFields.equals(rightInputFields)) { - throw new ExpressionException( - "The fields names of join inputs should be fully overlapped, left inputs fields:" + - operation.outputFields.mkString(", ") + - " and right inputs fields" + - right.operation.outputFields.mkString(", ") - ) + + val leftRowType: List[RelDataTypeField] = relNode.getRowType.getFieldList.asScala.toList + val rightRowType: List[RelDataTypeField] = right.relNode.getRowType.getFieldList.asScala.toList + + if (leftRowType.length != rightRowType.length) { + throw new IllegalArgumentException("Unioned tables have varying row schema.") + } + else { + val zipped: List[(RelDataTypeField, RelDataTypeField)] = leftRowType.zip(rightRowType) + zipped.foreach { case (x, y) => + if (!x.getName.equals(y.getName) || x.getType != y.getType) { + throw new IllegalArgumentException("Unioned tables have varying row schema.") + } + } } - this.copy(operation = UnionAll(operation, right.operation)) + + relBuilder.push(relNode) + relBuilder.push(right.relNode) + + relBuilder.union(true) + new Table(relBuilder.build(), relBuilder) } /** @@ -277,18 +314,79 @@ case class Table(private[flink] val operation: PlanNode) { * referenced by the statement will be scanned. */ def explain(extended: Boolean): String = { - val ast = operation - val dataSet = this.toDataSet[Row] - val env = dataSet.getExecutionEnvironment - dataSet.output(new DiscardingOutputFormat[Row]) - val jasonSqlPlan = env.getExecutionPlan() - val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) - val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" + - "\n" + sqlPlan - return result + + // TODO: enable once toDataSet() is working again + +// val ast = operation +// val dataSet = this.toDataSet[Row] +// val env = dataSet.getExecutionEnvironment +// dataSet.output(new DiscardingOutputFormat[Row]) +// val jasonSqlPlan = env.getExecutionPlan() +// val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) +// val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" + +// "\n" + sqlPlan +// return result + + "" } - + def explain(): String = explain(false) - - override def toString: String = s"Expression($operation)" +} + +class GroupedTable( + private[flink] override val relNode: RelNode, + private[flink] override val relBuilder: RelBuilder, + private[flink] val groupKey: GroupKey) extends BaseTable(relNode, relBuilder) { + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) + * }}} + */ + def select(fields: Expression*): Table = { + + relBuilder.push(relNode) + + // separate aggregations and selection expressions + val extractedAggCalls: List[(Expression, List[AggCall])] = fields + .map(extractAggCalls(_, relBuilder)).toList + + // get aggregation calls + val aggCalls: List[AggCall] = extractedAggCalls + .map(_._2).reduce( (x,y) => x ::: y) + + // apply aggregations + if (aggCalls.nonEmpty) { + relBuilder.aggregate(groupKey, aggCalls.toIterable.asJava) + } + + // get selection expressions + val exprs: List[RexNode] = extractedAggCalls + .map(_._1) + .map(toRexNode(_, relBuilder)) + + relBuilder.project(exprs.toIterable.asJava) + new Table(relBuilder.build(), relBuilder) + } + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") + * }}} + */ + def select(fields: String): Table = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala index a03ba61..9592f2e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -561,41 +561,6 @@ abstract class ExpressionCodeGenerator[R]( """.stripMargin } - case BitwiseAnd(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm & (int) $rightTerm" - } - - case BitwiseOr(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm | (int) $rightTerm" - } - - case BitwiseXor(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm ^ (int) $rightTerm" - } - - case BitwiseNot(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTerm = ~((int) ${childCode.resultTerm}); - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = ~((int) ${childCode.resultTerm}); - """.stripMargin - } - case Not(child) => val childCode = generateExpression(child) if (nullCheck) { http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala index 08e319d..d2fbdff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala @@ -17,9 +17,8 @@ */ package org.apache.flink.api.table.expressions -import org.apache.flink.api.table.ExpressionException import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.java.aggregation.Aggregations +import org.apache.flink.api.table.ExpressionException abstract sealed class Aggregation extends UnaryExpression { self: Product => @@ -39,61 +38,24 @@ abstract sealed class Aggregation extends UnaryExpression { self: Product => } override def toString = s"Aggregate($child)" - - def getIntermediateFields: Seq[Expression] - def getFinalField(inputs: Seq[Expression]): Expression - def getAggregations: Seq[Aggregations] } case class Sum(child: Expression) extends Aggregation { override def toString = s"($child).sum" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) } case class Min(child: Expression) extends Aggregation { override def toString = s"($child).min" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MIN) - } case class Max(child: Expression) extends Aggregation { override def toString = s"($child).max" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MAX) } case class Count(child: Expression) extends Aggregation { - override def typeInfo = { - child.typeInfo match { - case _ => // we can count anything... :D - } - BasicTypeInfo.INT_TYPE_INFO - } - override def toString = s"($child).count" - - override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1))) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) - } case class Avg(child: Expression) extends Aggregation { override def toString = s"($child).avg" - - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) - // This is just sweet. Use our own AST representation and let the code generator do - // our dirty work. - override def getFinalField(inputs: Seq[Expression]): Expression = - Div(inputs(0), inputs(1)) - override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM) - } http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala deleted file mode 100644 index 797de55..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.common.typeutils.CompositeType - -import scala.collection.mutable - -/** - * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified - * expression without the equi-join predicates together with indices of the join fields - * from both the left and right input. - */ -object ExtractEquiJoinFields { - def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = { - - val joinFieldsLeft = mutable.MutableList[Int]() - val joinFieldsRight = mutable.MutableList[Int]() - - val equiJoinExprs = mutable.MutableList[EqualTo]() - // First get all `===` expressions that are not below an `Or` - predicate.transformPre { - case or@Or(_, _) => NopExpression() - case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) => - if (leftType.hasField(le.name) && rightType.hasField(re.name)) { - joinFieldsLeft += leftType.getFieldIndex(le.name) - joinFieldsRight += rightType.getFieldIndex(re.name) - } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) { - joinFieldsLeft += leftType.getFieldIndex(re.name) - joinFieldsRight += rightType.getFieldIndex(le.name) - } else { - // not an equi-join predicate - } - equiJoinExprs += eq - eq - } - - // then remove the equi join expressions from the predicate - val resultExpr = predicate.transformPost { - // For OR, we can eliminate the OR since the equi join - // predicate is evaluated before the expression is evaluated - case or@Or(NopExpression(), _) => NopExpression() - case or@Or(_, NopExpression()) => NopExpression() - // For AND we replace it with the other expression, since the - // equi join predicate will always be true - case and@And(NopExpression(), other) => other - case and@And(other, NopExpression()) => other - case eq : EqualTo if equiJoinExprs.contains(eq) => - NopExpression() - } - - (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala deleted file mode 100644 index 6c7ecb2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table._ -import org.apache.flink.api.table.expressions.{ResolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.{Rule, Analyzer} - - -/** - * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions. - */ -class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - - def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression) - - object CheckGroupExpression extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - expr match { - case f: ResolvedFieldReference => // this is OK - case other => - throw new ExpressionException( - s"""Invalid grouping expression "$expr". Only field references are allowed.""") - } - expr - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala deleted file mode 100644 index 0fdcab6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo} -import org.apache.flink.api.table.trees.Rule - -/** - * [[Rule]] that adds casts in arithmetic operations. - */ -class InsertAutoCasts extends Rule[Expression] { - - def apply(expr: Expression) = { - val result = expr.transformPost { - - case plus@Plus(o1, o2) => - // Plus is special case since we can cast anything to String for String concat - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(Cast(o1, o2.typeInfo), o2) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(o1, Cast(o2, o1.typeInfo)) - } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO)) - } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2) - } else { - plus - } - } else { - plus - } - - case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] || - ba.isInstanceOf[BinaryComparison] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - - case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] && - o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - } - - result - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala deleted file mode 100644 index e9236f7..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Analyzer - -/** - * Analyzer for predicates, i.e. filter operations and where clauses of joins. - */ -class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - def rules = Seq( - new ResolveFieldReferences(inputFields), - new InsertAutoCasts, - new TypeCheck, - new VerifyNoAggregates, - new VerifyBoolean) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala deleted file mode 100644 index db7ea6c..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.{ResolvedFieldReference, -UnresolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table._ - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that resolved field references. This rule verifies that field references point to existing - * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field - * [[TypeInformation]] in addition to the field name. - */ -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) - extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPost { - case fe@UnresolvedFieldReference(fieldName) => - inputFields.find { _._1 == fieldName } match { - case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe) - - case None => - errors += - s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}" - fe - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala deleted file mode 100644 index 625fdbf..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Analyzer - -/** - * This analyzes selection expressions. - */ -class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) - extends Analyzer[Expression] { - - def rules = Seq( - new ResolveFieldReferences(inputFields), - new VerifyNoNestedAggregates, - new InsertAutoCasts, - new TypeCheck) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala deleted file mode 100644 index b724561..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.trees.Rule -import org.apache.flink.api.table.{_} - -import scala.collection.mutable - -/** - * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once. - * Expressions are expected to perform type verification in this method. - */ -class TypeCheck extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case expr: Expression=> { - // simply get the typeInfo from the expression. this will perform type analysis - try { - expr.typeInfo - } catch { - case e: ExpressionException => - errors += e.getMessage - } - expr - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala deleted file mode 100644 index e75dd20..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.expressions.{NopExpression, Expression} -import org.apache.flink.api.table.trees.Rule -import org.apache.flink.api.table.{_} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -import scala.collection.mutable - -/** - * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required - * for filter/join predicates. - */ -class VerifyBoolean extends Rule[Expression] { - - def apply(expr: Expression) = { - if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.") - } - - expr - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e4ee0c6d/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala deleted file mode 100644 index 09dbf88..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.table.expressions.{Aggregation, Expression} - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that verifies that an expression does not contain aggregate operations. Right now, join - * predicates and filter predicates cannot contain aggregates. - */ -class VerifyNoAggregates extends Rule[Expression] { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case agg: Aggregation=> { - errors += - s"""Aggregations are not allowed in join/filter predicates.""" - agg - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -}