[FLINK-1623] Rename Expression API to Table API Package name is now flink-table. ExpressionOperation is renamed to Table.
This also adds more JavaDoc and ScalDoc. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9519c8d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9519c8d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9519c8d Branch: refs/heads/master Commit: c9519c8d6c869d2bfab186e449f0ad2b62484805 Parents: d7d9b63 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Mar 18 14:44:42 2015 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Mar 29 12:27:53 2015 +0200 ---------------------------------------------------------------------- docs/linq.md | 90 ++- .../org/apache/flink/api/scala/DataSet.scala | 2 +- flink-staging/flink-expressions/pom.xml | 246 ------- .../api/java/expressions/package-info.java | 22 - .../examples/java/JavaExpressionExample.java | 69 -- .../api/expressions/ExpressionException.scala | 23 - .../api/expressions/ExpressionOperation.scala | 245 ------- .../org/apache/flink/api/expressions/Row.scala | 38 -- .../api/expressions/analysis/Analyzer.scala | 38 -- .../analysis/ExtractEquiJoinFields.scala | 70 -- .../expressions/analysis/GroupByAnalyzer.scala | 48 -- .../expressions/analysis/InsertAutoCasts.scala | 91 --- .../analysis/PredicateAnalyzer.scala | 32 - .../analysis/ResolveFieldReferences.scala | 57 -- .../flink/api/expressions/analysis/Rule.scala | 30 - .../analysis/SelectionAnalyzer.scala | 33 - .../api/expressions/analysis/TypeCheck.scala | 56 -- .../expressions/analysis/VerifyBoolean.scala | 40 -- .../analysis/VerifyNoAggregates.scala | 51 -- .../analysis/VerifyNoNestedAggregates.scala | 52 -- .../codegen/ExpressionCodeGenerator.scala | 635 ------------------- .../codegen/GenerateBinaryPredicate.scala | 73 --- .../codegen/GenerateBinaryResultAssembler.scala | 60 -- .../codegen/GenerateResultAssembler.scala | 99 --- .../codegen/GenerateUnaryPredicate.scala | 67 -- .../codegen/GenerateUnaryResultAssembler.scala | 57 -- .../flink/api/expressions/codegen/package.scala | 25 - .../operations/ExpandAggregations.scala | 144 ----- .../operations/OperationTranslator.scala | 35 - .../api/expressions/operations/operations.scala | 101 --- .../api/expressions/operations/package.scala | 24 - .../expressions/parser/ExpressionParser.scala | 209 ------ .../runtime/ExpressionAggregateFunction.scala | 72 --- .../runtime/ExpressionFilterFunction.scala | 47 -- .../runtime/ExpressionJoinFunction.scala | 76 --- .../runtime/ExpressionSelectFunction.scala | 51 -- .../flink/api/expressions/runtime/package.scala | 23 - .../flink/api/expressions/tree/Expression.scala | 149 ----- .../api/expressions/tree/aggregations.scala | 99 --- .../flink/api/expressions/tree/arithmetic.scala | 145 ----- .../flink/api/expressions/tree/cast.scala | 24 - .../flink/api/expressions/tree/comparison.scala | 93 --- .../api/expressions/tree/fieldExpression.scala | 41 -- .../flink/api/expressions/tree/literals.scala | 40 -- .../flink/api/expressions/tree/logic.scala | 58 -- .../flink/api/expressions/tree/package.scala | 29 - .../expressions/tree/stringExpressions.scala | 46 -- .../expressions/typeinfo/RenameOperator.scala | 36 -- .../typeinfo/RenamingProxyTypeInfo.scala | 109 ---- .../expressions/typeinfo/RowSerializer.scala | 121 ---- .../api/expressions/typeinfo/RowTypeInfo.scala | 51 -- .../api/java/expressions/ExpressionUtil.scala | 112 ---- .../scala/expressions/DataSetConversions.scala | 66 -- .../expressions/DataStreamConversions.scala | 65 -- .../scala/expressions/JavaBatchTranslator.scala | 392 ------------ .../expressions/JavaStreamingTranslator.scala | 303 --------- .../expressions/ScalaBatchTranslator.scala | 55 -- .../expressions/ScalaStreamingTranslator.scala | 56 -- .../api/scala/expressions/expressionDsl.scala | 124 ---- .../flink/api/scala/expressions/package.scala | 102 --- .../examples/scala/PageRankExpression.scala | 210 ------ .../scala/StreamingExpressionFilter.scala | 90 --- .../examples/scala/TPCHQuery3Expression.scala | 174 ----- .../expressions/test/AggregationsITCase.java | 210 ------ .../api/java/expressions/test/AsITCase.java | 158 ----- .../java/expressions/test/CastingITCase.java | 130 ---- .../expressions/test/ExpressionsITCase.java | 192 ------ .../api/java/expressions/test/FilterITCase.java | 130 ---- .../test/GroupedAggregationsITCase.java | 126 ---- .../api/java/expressions/test/JoinITCase.java | 202 ------ .../api/java/expressions/test/SelectITCase.java | 169 ----- .../test/StringExpressionsITCase.java | 144 ----- .../test/PageRankExpressionITCase.java | 100 --- .../expressions/test/AggregationsITCase.scala | 127 ---- .../api/scala/expressions/test/AsITCase.scala | 124 ---- .../scala/expressions/test/CastingITCase.scala | 92 --- .../expressions/test/ExpressionsITCase.scala | 127 ---- .../scala/expressions/test/FilterITCase.scala | 151 ----- .../test/GroupedAggreagationsITCase.scala | 96 --- .../api/scala/expressions/test/JoinITCase.scala | 145 ----- .../scala/expressions/test/SelectITCase.scala | 143 ----- .../test/StringExpressionsITCase.scala | 98 --- .../flink/streaming/api/scala/DataStream.scala | 5 + flink-staging/flink-table/pom.xml | 246 +++++++ .../flink/api/java/table/package-info.java | 60 ++ .../apache/flink/api/table/package-info.java | 33 + .../flink/examples/java/JavaTableExample.java | 71 +++ .../api/java/table/JavaBatchTranslator.scala | 319 ++++++++++ .../java/table/JavaStreamingTranslator.scala | 236 +++++++ .../flink/api/java/table/TableEnvironment.scala | 112 ++++ .../api/scala/table/DataSetConversions.scala | 67 ++ .../api/scala/table/DataStreamConversions.scala | 68 ++ .../api/scala/table/ScalaBatchTranslator.scala | 68 ++ .../scala/table/ScalaStreamingTranslator.scala | 58 ++ .../flink/api/scala/table/expressionDsl.scala | 124 ++++ .../apache/flink/api/scala/table/package.scala | 101 +++ .../flink/api/table/ExpressionException.scala | 23 + .../scala/org/apache/flink/api/table/Row.scala | 38 ++ .../org/apache/flink/api/table/Table.scala | 243 +++++++ .../flink/api/table/analysis/Analyzer.scala | 38 ++ .../table/analysis/ExtractEquiJoinFields.scala | 70 ++ .../api/table/analysis/GroupByAnalyzer.scala | 48 ++ .../api/table/analysis/InsertAutoCasts.scala | 91 +++ .../api/table/analysis/PredicateAnalyzer.scala | 32 + .../table/analysis/ResolveFieldReferences.scala | 57 ++ .../apache/flink/api/table/analysis/Rule.scala | 30 + .../api/table/analysis/SelectionAnalyzer.scala | 33 + .../flink/api/table/analysis/TypeCheck.scala | 56 ++ .../api/table/analysis/VerifyBoolean.scala | 40 ++ .../api/table/analysis/VerifyNoAggregates.scala | 51 ++ .../analysis/VerifyNoNestedAggregates.scala | 52 ++ .../table/codegen/ExpressionCodeGenerator.scala | 635 +++++++++++++++++++ .../table/codegen/GenerateBinaryPredicate.scala | 73 +++ .../codegen/GenerateBinaryResultAssembler.scala | 60 ++ .../table/codegen/GenerateResultAssembler.scala | 99 +++ .../table/codegen/GenerateUnaryPredicate.scala | 67 ++ .../codegen/GenerateUnaryResultAssembler.scala | 57 ++ .../flink/api/table/codegen/package.scala | 25 + .../table/operations/ExpandAggregations.scala | 144 +++++ .../api/table/operations/TableTranslator.scala | 158 +++++ .../flink/api/table/operations/operations.scala | 101 +++ .../flink/api/table/operations/package.scala | 24 + .../org/apache/flink/api/table/package.scala | 34 + .../api/table/parser/ExpressionParser.scala | 209 ++++++ .../runtime/ExpressionAggregateFunction.scala | 72 +++ .../runtime/ExpressionFilterFunction.scala | 47 ++ .../table/runtime/ExpressionJoinFunction.scala | 76 +++ .../runtime/ExpressionSelectFunction.scala | 51 ++ .../flink/api/table/runtime/package.scala | 23 + .../flink/api/table/tree/Expression.scala | 149 +++++ .../flink/api/table/tree/aggregations.scala | 99 +++ .../flink/api/table/tree/arithmetic.scala | 145 +++++ .../org/apache/flink/api/table/tree/cast.scala | 24 + .../flink/api/table/tree/comparison.scala | 93 +++ .../flink/api/table/tree/fieldExpression.scala | 41 ++ .../apache/flink/api/table/tree/literals.scala | 40 ++ .../org/apache/flink/api/table/tree/logic.scala | 58 ++ .../apache/flink/api/table/tree/package.scala | 29 + .../api/table/tree/stringExpressions.scala | 46 ++ .../api/table/typeinfo/RenameOperator.scala | 36 ++ .../table/typeinfo/RenamingProxyTypeInfo.scala | 109 ++++ .../api/table/typeinfo/RowSerializer.scala | 121 ++++ .../flink/api/table/typeinfo/RowTypeInfo.scala | 51 ++ .../examples/scala/PageRankExpression.scala | 210 ++++++ .../scala/StreamingExpressionFilter.scala | 90 +++ .../examples/scala/TPCHQuery3Expression.scala | 174 +++++ .../api/java/table/test/AggregationsITCase.java | 215 +++++++ .../flink/api/java/table/test/AsITCase.java | 165 +++++ .../api/java/table/test/CastingITCase.java | 133 ++++ .../api/java/table/test/ExpressionsITCase.java | 197 ++++++ .../flink/api/java/table/test/FilterITCase.java | 133 ++++ .../table/test/GroupedAggregationsITCase.java | 131 ++++ .../flink/api/java/table/test/JoinITCase.java | 216 +++++++ .../flink/api/java/table/test/SelectITCase.java | 180 ++++++ .../table/test/StringExpressionsITCase.java | 150 +++++ .../table/test/PageRankExpressionITCase.java | 100 +++ .../scala/table/test/AggregationsITCase.scala | 127 ++++ .../flink/api/scala/table/test/AsITCase.scala | 124 ++++ .../api/scala/table/test/CastingITCase.scala | 92 +++ .../scala/table/test/ExpressionsITCase.scala | 127 ++++ .../api/scala/table/test/FilterITCase.scala | 151 +++++ .../table/test/GroupedAggreagationsITCase.scala | 96 +++ .../flink/api/scala/table/test/JoinITCase.scala | 145 +++++ .../api/scala/table/test/SelectITCase.scala | 143 +++++ .../table/test/StringExpressionsITCase.scala | 98 +++ flink-staging/pom.xml | 2 +- 166 files changed, 8727 insertions(+), 8523 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/docs/linq.md ---------------------------------------------------------------------- diff --git a/docs/linq.md b/docs/linq.md index ebb0063..79fe6f2 100644 --- a/docs/linq.md +++ b/docs/linq.md @@ -1,5 +1,5 @@ --- -title: "Language-Integrated Queries" +title: "Language-Integrated Queries (Table API)" --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -23,58 +23,92 @@ under the License. * This will be replaced by the TOC {:toc} -**Language-Integrated Queries are an experimental feature and can currently only be used with -the Scala API** +**Language-Integrated Queries are an experimental feature** -Flink provides an API that allows specifying operations using SQL-like expressions. -This Expression API can be enabled by importing -`org.apache.flink.api.scala.expressions._`. This enables implicit conversions that allow -converting a `DataSet` or `DataStream` to an `ExpressionOperation` on which relational queries -can be specified. This example shows how a `DataSet` can be converted, how expression operations -can be specified and how an expression operation can be converted back to a `DataSet`: +Flink provides an API that allows specifying operations using SQL-like expressions. Instead of +manipulating `DataSet` or `DataStream` you work with `Table` on which relational operations can +be performed. + +The following dependency must be added to your project when using the Table API: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table</artifactId> + <version>{{site.FLINK_VERSION_SHORT }}</version> +</dependency> +{% endhighlight %} + +## Scala Table API + +The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables +implicit conversions that allow +converting a DataSet or DataStream to a Table. This example shows how a DataSet can +be converted, how relational queries can be specified and how a Table can be +converted back to a DataSet: {% highlight scala %} import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.expressions._ +import org.apache.flink.api.scala.table._ case class WC(word: String, count: Int) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) -val expr = input.toExpression -val result = expr.groupBy('word).select('word, 'count.sum).as[WC] +val expr = input.toTable +val result = expr.groupBy('word).select('word, 'count.sum).toSet[WC] {% endhighlight %} The expression DSL uses Scala symbols to refer to field names and we use code generation to -transform expressions to efficient runtime code. Please not that the conversion to and from -expression operations only works when using Scala case classes or Flink POJOs. Please check out +transform expressions to efficient runtime code. Please note that the conversion to and from +Tables only works when using Scala case classes or Flink POJOs. Please check out the [programming guide](programming_guide.html) to learn the requirements for a class to be considered a POJO. This is another example that shows how you -can join to operations: +can join to Tables: {% highlight scala %} case class MyResult(a: String, b: Int) val input1 = env.fromElements(...).as('a, 'b) val input2 = env.fromElements(...).as('c, 'd) -val joined = input1.join(input2).where('b == 'a && 'd > 42).select('a, 'd).as[MyResult] +val joined = input1.join(input2).where("b = a && d > 42").select("a, d").as[MyResult] {% endhighlight %} -Notice, how a `DataSet` can be converted to an expression operation by using `as` and specifying new -names for the fields. This can also be used to disambiguate fields before a join operation. +Notice, how a DataSet can be converted to a Table by using `as` and specifying new +names for the fields. This can also be used to disambiguate fields before a join operation. Also, +in this example we see that you can also use Strings to specify relational expressions. -The Expression API can be used with the Streaming API, since we also have implicit conversions to -and from `DataStream`. +Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a +description of the expression syntax. -The following dependency must be added to your project when using the Expression API: +## Java Table API -{% highlight xml %} -<dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-expressions</artifactId> - <version>{{site.FLINK_VERSION_SHORT }}</version> -</dependency> +When using Java, Tables can be converted to and from DataSet and DataStream using `TableEnvironment`. +This example is equivalent to the above Scala Example: + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); +TableEnvironment tableEnv = new TableEnvironment(); + +DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + +Table table = tableEnv.toTable(input); + +Table filtered = table + .groupBy("word") + .select("word.count as count, word") + .filter("count = 2"); + +DataSet<WC> result = tableEnv.toSet(filtered, WC.class); {% endhighlight %} -Please refer to the scaladoc for a full list of supported operations and a description of the +When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions +are supported. They support exactly the same feature set as the expression DSL. + +Please refer to the Javadoc for a full list of supported operations and a description of the expression syntax. + + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 2732112..de07a57 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -90,7 +90,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { /** * Returns the TypeInformation for the elements of this DataSet. */ - def getType: TypeInformation[T] = set.getType + def getType(): TypeInformation[T] = set.getType /** * Returns the execution environment associated with the current DataSet. http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/pom.xml b/flink-staging/flink-expressions/pom.xml deleted file mode 100644 index f26ab03..0000000 --- a/flink-staging/flink-expressions/pom.xml +++ /dev/null @@ -1,246 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>0.9-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-expressions</artifactId> - <name>flink-expressions</name> - - <packaging>jar</packaging> - - <dependencies> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala-examples</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - <compilerPlugins combine.children="append"> - <compilerPlugin> - <groupId>org.scalamacros</groupId> - <artifactId>paradise_${scala.version}</artifactId> - <version>${scala.macros.version}</version> - </compilerPlugin> - </compilerPlugins> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - - </plugins> - </build> - - <profiles> - <profile> - <id>scala-2.10</id> - <activation> - <property> - <!-- this is the default scala profile --> - <name>!scala-2.11</name> - </property> - </activation> - <dependencies> - <dependency> - <groupId>org.scalamacros</groupId> - <artifactId>quasiquotes_${scala.binary.version}</artifactId> - <version>${scala.macros.version}</version> - </dependency> - </dependencies> - </profile> - </profiles> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java deleted file mode 100644 index 07e18b2..0000000 --- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package doc wohoooo - */ -package org.apache.flink.api.java.expressions; http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java deleted file mode 100644 index 42632f9..0000000 --- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.java; - - -import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.expressions.ExpressionUtil; - -/** - * Very simple example that shows how the Java Expression API can be used. - */ -public class JavaExpressionExample { - - public static class WC { - public String word; - public int count; - - public WC() { - - } - - public WC(String word, int count) { - this.word = word; - this.count = count; - } - - @Override - public String toString() { - return "WC " + word + " " + count; - } - } - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - - DataSet<WC> input = env.fromElements( - new WC("Hello", 1), - new WC("Ciao", 1), - new WC("Hello", 1)); - - ExpressionOperation expr = ExpressionUtil.from(input); - - ExpressionOperation filtered = expr - .groupBy("word") - .select("word.count as count, word") - .filter("count = 2"); - - DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class); - - result.print(); - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala deleted file mode 100644 index 34e400f..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -/** - * Exception for all errors occurring during expression operations. - */ -class ExpressionException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala deleted file mode 100644 index 38417b2..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer, -PredicateAnalyzer} -import org.apache.flink.api.expressions.operations._ -import org.apache.flink.api.expressions.parser.ExpressionParser -import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, -UnresolvedFieldReference, Expression} - -/** - * The abstraction for writing expression API programs. Similar to how the batch and streaming APIs - * have [[org.apache.flink.api.scala.DataSet]] and - * [[org.apache.flink.streaming.api.scala.DataStream]]. - * - * Use the methods of [[ExpressionOperation]] to transform data or to revert back to the underlying - * batch or streaming representation. - */ -case class ExpressionOperation[A <: OperationTranslator]( - private[flink] val operation: Operation, - private[flink] val operationTranslator: A) { - - - /** - * Converts the result of this operation back to a [[org.apache.flink.api.scala.DataSet]] or - * [[org.apache.flink.streaming.api.scala.DataStream]]. - */ - def as[O](implicit tpe: TypeInformation[O]): operationTranslator.Representation[O] = { - operationTranslator.translate(operation) - } - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) - * }}} - */ - def select(fields: Expression*): ExpressionOperation[A] = { - val analyzer = new SelectionAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - val fieldNames = analyzedFields map(_.name) - if (fieldNames.toSet.size != fieldNames.size) { - throw new ExpressionException(s"Resulting fields names are not unique in expression" + - s""" "${fields.mkString(", ")}".""") - } - this.copy(operation = Select(operation, analyzedFields)) - } - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") - * }}} - */ - def select(fields: String): ExpressionOperation[A] = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - select(fieldExprs: _*) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as('a, 'b) - * }}} - */ - def as(fields: Expression*): ExpressionOperation[A] = { - fields forall { - f => f.isInstanceOf[UnresolvedFieldReference] - } match { - case true => - case false => throw new ExpressionException("Only field expression allowed in as().") - } - this.copy(operation = As(operation, fields.toArray map { _.name })) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as("a, b") - * }}} - */ - def as(fields: String): ExpressionOperation[A] = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - as(fieldExprs: _*) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter('name === "Fred") - * }}} - */ - def filter(predicate: Expression): ExpressionOperation[A] = { - val analyzer = new PredicateAnalyzer(operation.outputFields) - val analyzedPredicate = analyzer.analyze(predicate) - this.copy(operation = Filter(operation, analyzedPredicate)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter("name === 'Fred'") - * }}} - */ - def filter(predicate: String): ExpressionOperation[A] = { - val predicateExpr = ExpressionParser.parseExpression(predicate) - filter(predicateExpr) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter(name === "Fred") - * }}} - */ - def where(predicate: Expression): ExpressionOperation[A] = { - filter(predicate) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter("name === 'Fred'") - * }}} - */ - def where(predicate: String): ExpressionOperation[A] = { - filter(predicate) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy('key).select('key, 'value.avg) - * }}} - */ - def groupBy(fields: Expression*): ExpressionOperation[A] = { - val analyzer = new GroupByAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - - val illegalKeys = analyzedFields filter { - case fe: ResolvedFieldReference => false // OK - case e => true - } - - if (illegalKeys.nonEmpty) { - throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) - } - - this.copy(operation = GroupBy(operation, analyzedFields)) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy("key").select("key, value.avg") - * }}} - */ - def groupBy(fields: String): ExpressionOperation[A] = { - val fieldsExpr = ExpressionParser.parseExpressionList(fields) - groupBy(fieldsExpr: _*) - } - - /** - * Joins to expression operations. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. You can use - * where and select clauses after a join to further specify the behaviour of the join. - * - * Example: - * - * {{{ - * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) - * }}} - */ - def join(right: ExpressionOperation[A]): ExpressionOperation[A] = { - val leftInputNames = operation.outputFields.map(_._1).toSet - val rightInputNames = right.operation.outputFields.map(_._1).toSet - if (leftInputNames.intersect(rightInputNames).nonEmpty) { - throw new ExpressionException( - "Overlapping fields names on join input, result would be ambiguous: " + - operation.outputFields.mkString(", ") + - " and " + - right.operation.outputFields.mkString(", ") ) - } - this.copy(operation = Join(operation, right.operation)) - } - - override def toString: String = s"Expression($operation)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala deleted file mode 100644 index 47ef59e..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/Row.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -/** - * This is used for executing expression operations. We use manually generated - * TypeInfo to check the field types and create serializers and comparators. - */ -class Row(arity: Int) extends Product { - - private val fields = new Array[Any](arity) - - def productArity = fields.length - - def productElement(i: Int): Any = fields(i) - - def setField(i: Int, value: Any): Unit = fields(i) = value - - def canEqual(that: Any) = false - - override def toString = fields.mkString(",") - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala deleted file mode 100644 index da71cdd..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Analyzer.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree.Expression - -/** - * Base class for expression analyzers/transformers. Analyzers must implement method `rules` to - * provide the chain of rules that are invoked one after another. The expression resulting - * from one rule is fed into the next rule and the final result is returned from method `analyze`. - */ -abstract class Analyzer { - - def rules: Seq[Rule] - - final def analyze(expr: Expression): Expression = { - var currentTree = expr - for (rule <- rules) { - currentTree = rule(currentTree) - } - currentTree - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala deleted file mode 100644 index a4f8f25..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ExtractEquiJoinFields.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.common.typeutils.CompositeType - -import scala.collection.mutable - -/** - * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified - * expression without the equi-join predicates together with indices of the join fields - * from both the left and right input. - */ -object ExtractEquiJoinFields { - def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = { - - val joinFieldsLeft = mutable.MutableList[Int]() - val joinFieldsRight = mutable.MutableList[Int]() - - val equiJoinExprs = mutable.MutableList[EqualTo]() - // First get all `===` expressions that are not below an `Or` - predicate.transformPre { - case or@Or(_, _) => NopExpression() - case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) => - if (leftType.hasField(le.name) && rightType.hasField(re.name)) { - joinFieldsLeft += leftType.getFieldIndex(le.name) - joinFieldsRight += rightType.getFieldIndex(re.name) - } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) { - joinFieldsLeft += leftType.getFieldIndex(re.name) - joinFieldsRight += rightType.getFieldIndex(le.name) - } else { - // not an equi-join predicate - } - equiJoinExprs += eq - eq - } - - // then remove the equi join expressions from the predicate - val resultExpr = predicate.transformPost { - // For OR, we can eliminate the OR since the equi join - // predicate is evaluated before the expression is evaluated - case or@Or(NopExpression(), _) => NopExpression() - case or@Or(_, NopExpression()) => NopExpression() - // For AND we replace it with the other expression, since the - // equi join predicate will always be true - case and@And(NopExpression(), other) => other - case and@And(other, NopExpression()) => other - case eq : EqualTo if equiJoinExprs.contains(eq) => - NopExpression() - } - - (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala deleted file mode 100644 index 21f989c..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/GroupByAnalyzer.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions._ -import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation - -import scala.collection.mutable - - -/** - * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions. - */ -class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { - - def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression) - - object CheckGroupExpression extends Rule { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - expr match { - case f: ResolvedFieldReference => // this is OK - case other => - throw new ExpressionException( - s"""Invalid grouping expression "$expr". Only field references are allowed.""") - } - expr - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala deleted file mode 100644 index 319e72f..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/InsertAutoCasts.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo} - -/** - * [[Rule]] that adds casts in arithmetic operations. - */ -class InsertAutoCasts extends Rule { - - def apply(expr: Expression) = { - val result = expr.transformPost { - - case plus@Plus(o1, o2) => - // Plus is special case since we can cast anything to String for String concat - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(Cast(o1, o2.typeInfo), o2) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - Plus(o1, Cast(o2, o1.typeInfo)) - } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO)) - } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { - Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2) - } else { - plus - } - } else { - plus - } - - case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] || - ba.isInstanceOf[BinaryComparison] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - - case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] => - val o1 = ba.left - val o2 = ba.right - if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] && - o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) - } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( - o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { - ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) - } else { - ba - } - } else { - ba - } - } - - result - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala deleted file mode 100644 index f108f5c..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation - -/** - * Analyzer for unary predicates, i.e. filter operations. - */ -class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { - def rules = Seq( - new ResolveFieldReferences(inputFields), - new InsertAutoCasts, - new TypeCheck, - new VerifyNoAggregates, - new VerifyBoolean) -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala deleted file mode 100644 index 693dd88..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/ResolveFieldReferences.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, -UnresolvedFieldReference, Expression} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.expressions._ - -import scala.collection.mutable - -/** - * Rule that resolved field references. This rule verifies that field references point to existing - * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field - * [[TypeInformation]] in addition to the field name. - */ -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) extends Rule { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPost { - case fe@UnresolvedFieldReference(fieldName) => - inputFields.find { _._1 == fieldName } match { - case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe) - - case None => - errors += - s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}" - fe - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala deleted file mode 100644 index 853ee7a..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/Rule.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree.Expression - -/** - * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets on - * [[Expression]] and must return an expression. The returned [[Expression]] can also be - * the input [[Expression]]. In an [[Analyzer]] rule chain the result [[Expression]] of one - * [[Rule]] is fed into the next [[Rule]] in the chain. - */ -abstract class Rule { - def apply(expr: Expression): Expression -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala deleted file mode 100644 index eca007f..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/SelectionAnalyzer.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.common.typeinfo.TypeInformation - -/** - * This analyzes selection expressions. - */ -class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { - - def rules = Seq( - new ResolveFieldReferences(inputFields), - new VerifyNoNestedAggregates, - new InsertAutoCasts, - new TypeCheck) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala deleted file mode 100644 index 632daa3..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/TypeCheck.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.expressions.{_} - -import scala.collection.mutable - -/** - * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once. - * Expressions are expected to perform type verification in this method. - */ -class TypeCheck extends Rule { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case expr: Expression=> { - // simply get the typeInfo from the expression. this will perform type analysis - try { - expr.typeInfo - } catch { - case e: ExpressionException => - errors += e.getMessage - } - expr - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala deleted file mode 100644 index d0bd6b6..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyBoolean.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.tree.{NopExpression, Expression} -import org.apache.flink.api.expressions.{_} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -import scala.collection.mutable - -/** - * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required - * for filter/join predicates. - */ -class VerifyBoolean extends Rule { - - def apply(expr: Expression) = { - if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.") - } - - expr - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala deleted file mode 100644 index e9f8788..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.expressions.tree.{Aggregation, Expression} - -import scala.collection.mutable - -/** - * Rule that verifies that an expression does not contain aggregate operations. Right now, join - * predicates and filter predicates cannot contain aggregates. - */ -class VerifyNoAggregates extends Rule { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case agg: Aggregation=> { - errors += - s"""Aggregations are not allowed in join/filter predicates.""" - agg - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala deleted file mode 100644 index de5063a..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoNestedAggregates.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.expressions.analysis - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.expressions.tree.{Expression, Aggregation} - -import scala.collection.mutable - -/** - * Rule that verifies that an expression does not contain aggregate operations - * as children of aggregate operations. - */ -class VerifyNoNestedAggregates extends Rule { - - def apply(expr: Expression) = { - val errors = mutable.MutableList[String]() - - val result = expr.transformPre { - case agg: Aggregation=> { - if (agg.child.exists(_.isInstanceOf[Aggregation])) { - errors += s"""Found nested aggregation inside "$agg".""" - } - agg - } - } - - if (errors.length > 0) { - throw new ExpressionException( - s"""Invalid expression "$expr": ${errors.mkString(" ")}""") - } - - result - - } -}