[FLINK-947] Add parser to Expression API for exposing it to Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7d9b639 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7d9b639 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7d9b639 Branch: refs/heads/master Commit: d7d9b639025a74b322b51e5a35f656c57fa28fc6 Parents: 659ddc0 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Mar 6 11:53:48 2015 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Mar 29 12:27:53 2015 +0200 ---------------------------------------------------------------------- .../api/common/typeutils/CompositeType.java | 12 ++ .../api/java/typeutils/TupleTypeInfoBase.java | 5 + .../api/java/expressions/package-info.java | 22 ++ .../examples/java/JavaExpressionExample.java | 27 +-- .../api/expressions/ExpressionOperation.scala | 79 ++++++- .../analysis/PredicateAnalyzer.scala | 2 +- .../analysis/VerifyNoAggregates.scala | 51 +++++ .../expressions/parser/ExpressionParser.scala | 209 ++++++++++++++++++ .../api/java/expressions/ExpressionUtil.scala | 112 ++++++++++ .../scala/expressions/JavaBatchTranslator.scala | 40 +++- .../expressions/JavaStreamingTranslator.scala | 27 ++- .../api/scala/expressions/expressionDsl.scala | 7 + .../expressions/test/AggregationsITCase.java | 210 +++++++++++++++++++ .../api/java/expressions/test/AsITCase.java | 158 ++++++++++++++ .../java/expressions/test/CastingITCase.java | 130 ++++++++++++ .../expressions/test/ExpressionsITCase.java | 192 +++++++++++++++++ .../api/java/expressions/test/FilterITCase.java | 130 ++++++++++++ .../test/GroupedAggregationsITCase.java | 126 +++++++++++ .../api/java/expressions/test/JoinITCase.java | 202 ++++++++++++++++++ .../api/java/expressions/test/SelectITCase.java | 169 +++++++++++++++ .../test/StringExpressionsITCase.java | 144 +++++++++++++ .../test/PageRankExpressionITCase.java | 100 +++++++++ .../scala/expressions/AggregationsITCase.scala | 126 ----------- .../flink/api/scala/expressions/AsITCase.scala | 126 ----------- .../api/scala/expressions/CastingITCase.scala | 93 -------- .../scala/expressions/ExpressionsITCase.scala | 126 ----------- .../api/scala/expressions/FilterITCase.scala | 150 ------------- .../GroupedAggreagationsITCase.scala | 99 --------- .../api/scala/expressions/JoinITCase.scala | 132 ------------ .../expressions/PageRankExpressionITCase.java | 100 --------- .../api/scala/expressions/SelectITCase.scala | 130 ------------ .../expressions/StringExpressionsITCase.scala | 97 --------- .../expressions/test/AggregationsITCase.scala | 127 +++++++++++ .../api/scala/expressions/test/AsITCase.scala | 124 +++++++++++ .../scala/expressions/test/CastingITCase.scala | 92 ++++++++ .../expressions/test/ExpressionsITCase.scala | 127 +++++++++++ .../scala/expressions/test/FilterITCase.scala | 151 +++++++++++++ .../test/GroupedAggreagationsITCase.scala | 96 +++++++++ .../api/scala/expressions/test/JoinITCase.scala | 145 +++++++++++++ .../scala/expressions/test/SelectITCase.scala | 143 +++++++++++++ .../test/StringExpressionsITCase.scala | 98 +++++++++ 41 files changed, 3233 insertions(+), 1203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index 36778d6..54a1e13 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -176,6 +176,18 @@ public abstract class CompositeType<T> extends TypeInformation<T> { public abstract String[] getFieldNames(); /** + * True if this type has an inherent ordering of the fields, such that a user can + * always be sure in which order the fields will be in. This is true for Tuples and + * Case Classes. It is not true for Regular Java Objects, since there, the ordering of + * the fields can be arbitrary. + * + * This is used when translating a DataSet or DataStream to an Expression Table, when + * initially renaming the fields of the underlying type. + */ + public boolean hasDeterministicFieldOrder() { + return false; + } + /** * Returns the field index of the composite field of the given name. * * @return The field index or -1 if this type does not have a field of the given name. http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index ce075a7..d1c2c9d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -272,4 +272,9 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { bld.append('>'); return bld.toString(); } + + @Override + public boolean hasDeterministicFieldOrder() { + return true; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java new file mode 100644 index 0000000..07e18b2 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package doc wohoooo + */ +package org.apache.flink.api.java.expressions; http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java index 055eaee..42632f9 100644 --- a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java +++ b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java @@ -19,20 +19,12 @@ package org.apache.flink.examples.java; import org.apache.flink.api.expressions.ExpressionOperation; -import org.apache.flink.api.expressions.tree.EqualTo$; -import org.apache.flink.api.expressions.tree.Expression; -import org.apache.flink.api.expressions.tree.Literal$; -import org.apache.flink.api.expressions.tree.UnresolvedFieldReference$; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.api.java.expressions.ExpressionUtil; /** - * This is extremely bare-bones. We need a parser that can parse expressions in a String - * and create the correct expression AST. Then we can use expressions like this: - * - * {@code in.select("'field0.avg, 'field1.count") } + * Very simple example that shows how the Java Expression API can be used. */ public class JavaExpressionExample { @@ -60,17 +52,16 @@ public class JavaExpressionExample { DataSet<WC> input = env.fromElements( new WC("Hello", 1), new WC("Ciao", 1), - new WC("Hello", 1) - ); + new WC("Hello", 1)); - ExpressionOperation<JavaBatchTranslator> expr = new JavaBatchTranslator().createExpressionOperation( - input, - new Expression[] { UnresolvedFieldReference$.MODULE$.apply("count"), UnresolvedFieldReference$.MODULE$.apply("word")}); + ExpressionOperation expr = ExpressionUtil.from(input); - ExpressionOperation<JavaBatchTranslator> filtered = expr.filter( - EqualTo$.MODULE$.apply(UnresolvedFieldReference$.MODULE$.apply("word"), Literal$.MODULE$.apply("Hello"))); + ExpressionOperation filtered = expr + .groupBy("word") + .select("word.count as count, word") + .filter("count = 2"); - DataSet<WC> result = (DataSet<WC>) filtered.as(TypeExtractor.createTypeInfo(WC.class)); + DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class); result.print(); env.execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala index d843c5b..38417b2 100644 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala @@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer, PredicateAnalyzer} import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.parser.ExpressionParser import org.apache.flink.api.expressions.tree.{ResolvedFieldReference, UnresolvedFieldReference, Expression} @@ -67,6 +68,21 @@ case class ExpressionOperation[A <: OperationTranslator]( } /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") + * }}} + */ + def select(fields: String): ExpressionOperation[A] = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } + + /** * Renames the fields of the expression result. Use this to disambiguate fields before * joining to operations. * @@ -84,7 +100,21 @@ case class ExpressionOperation[A <: OperationTranslator]( case false => throw new ExpressionException("Only field expression allowed in as().") } this.copy(operation = As(operation, fields.toArray map { _.name })) + } + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as("a, b") + * }}} + */ + def as(fields: String): ExpressionOperation[A] = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + as(fieldExprs: _*) } /** @@ -110,7 +140,22 @@ case class ExpressionOperation[A <: OperationTranslator]( * Example: * * {{{ - * in.filter('name === "Fred") + * in.filter("name === 'Fred'") + * }}} + */ + def filter(predicate: String): ExpressionOperation[A] = { + val predicateExpr = ExpressionParser.parseExpression(predicate) + filter(predicateExpr) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter(name === "Fred") * }}} */ def where(predicate: Expression): ExpressionOperation[A] = { @@ -118,6 +163,20 @@ case class ExpressionOperation[A <: OperationTranslator]( } /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter("name === 'Fred'") + * }}} + */ + def where(predicate: String): ExpressionOperation[A] = { + filter(predicate) + } + + /** * Groups the elements on some grouping keys. Use this before a selection with aggregations * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. * @@ -144,8 +203,24 @@ case class ExpressionOperation[A <: OperationTranslator]( } /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy("key").select("key, value.avg") + * }}} + */ + def groupBy(fields: String): ExpressionOperation[A] = { + val fieldsExpr = ExpressionParser.parseExpressionList(fields) + groupBy(fieldsExpr: _*) + } + + /** * Joins to expression operations. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. + * operations must not overlap, use [[as]] to rename fields if necessary. You can use + * where and select clauses after a join to further specify the behaviour of the join. * * Example: * http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala index 2531fff..f108f5c 100644 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala @@ -18,7 +18,6 @@ package org.apache.flink.api.expressions.analysis import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType /** * Analyzer for unary predicates, i.e. filter operations. @@ -28,5 +27,6 @@ class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends new ResolveFieldReferences(inputFields), new InsertAutoCasts, new TypeCheck, + new VerifyNoAggregates, new VerifyBoolean) } http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala new file mode 100644 index 0000000..e9f8788 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.expressions.analysis + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.expressions.tree.{Aggregation, Expression} + +import scala.collection.mutable + +/** + * Rule that verifies that an expression does not contain aggregate operations. Right now, join + * predicates and filter predicates cannot contain aggregates. + */ +class VerifyNoAggregates extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case agg: Aggregation=> { + errors += + s"""Aggregations are not allowed in join/filter predicates.""" + agg + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala new file mode 100644 index 0000000..da53ded --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.expressions.parser + +import org.apache.flink.api.expressions.ExpressionException +import org.apache.flink.api.expressions.operations.As +import org.apache.flink.api.expressions.tree._ + +import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers} + +/** + * Parser for expressions inside a String. This parses exactly the same expressions that + * would be accepted by the Scala Expression DSL. + * + * See [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for the constructs + * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL + * lazy valined in the above files. + */ +object ExpressionParser extends JavaTokenParsers with PackratParsers { + + // Literals + + lazy val numberLiteral: PackratParser[Expression] = + ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ { + str => + if (str.endsWith("L") || str.endsWith("l")) { + Literal(str.toLong) + } else if (str.matches("""-?\d+""")) { + Literal(str.toInt) + } else if (str.endsWith("f") | str.endsWith("F")) { + Literal(str.toFloat) + } else { + Literal(str.toDouble) + } + } + + lazy val singleQuoteStringLiteral: Parser[Expression] = + ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ { + str => Literal(str.toBoolean) + } + + lazy val literalExpr: PackratParser[Expression] = + numberLiteral | + stringLiteralFlink | singleQuoteStringLiteral | + boolLiteral + + lazy val fieldReference: PackratParser[Expression] = ident ^^ { + case sym => UnresolvedFieldReference(sym) + } + + lazy val atom: PackratParser[Expression] = + ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference + + // suffix ops + lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } + lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } + + lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } + + lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) } + lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) } + lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) } + lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) } + lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) } + + lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { + case e ~ _ ~ as ~ _ => Naming(e, as.name) + } + + lazy val substring: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) + + } + + lazy val substringWithoutEnd: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) + + } + + lazy val suffix = + isNull | isNotNull | + abs | sum | min | max | count | avg | + substring | substringWithoutEnd | atom + + + // unary ops + + lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) } + + lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) } + + lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e => BitwiseNot(e) } + + lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix + + // binary bitwise opts + + lazy val binaryBitwise = unary * ( + "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } | + "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } | + "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } ) + + // arithmetic + + lazy val product = binaryBitwise * ( + "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } | + "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } | + "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) + + lazy val term = product * ( + "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } | + "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) + + // Comparison + + lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ { + case l ~ _ ~ r => EqualTo(l, r) + } + + lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ { + case l ~ _ ~ r => EqualTo(l, r) + } + + lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ { + case l ~ _ ~ r => NotEqualTo(l, r) + } + + lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ { + case l ~ _ ~ r => GreaterThan(l, r) + } + + lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ { + case l ~ _ ~ r => GreaterThanOrEqual(l, r) + } + + lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ { + case l ~ _ ~ r => LessThan(l, r) + } + + lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ { + case l ~ _ ~ r => LessThanOrEqual(l, r) + } + + lazy val comparison: PackratParser[Expression] = + equalTo | equalToAlt | notEqualTo | + greaterThan | greaterThanOrEqual | + lessThan | lessThanOrEqual | term + + // logic + + lazy val logic = comparison * ( + "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } | + "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) + + // alias + + lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ { + case e ~ _ ~ name => Naming(e, name.name) + } | logic + + lazy val expression: PackratParser[Expression] = alias + + lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") + + def parseExpressionList(expression: String): List[Expression] = { + parseAll(expressionList, expression) match { + case Success(lst, _) => lst + + case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + + case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + } + } + + def parseExpression(exprString: String): Expression = { + parseAll(expression, exprString) match { + case Success(lst, _) => lst + + case fail => + throw new ExpressionException("Could not parse expression: " + fail.toString) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala new file mode 100644 index 0000000..ad7cfe4 --- /dev/null +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.expressions + +import org.apache.flink.api.expressions.ExpressionOperation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.scala.expressions.JavaBatchTranslator +import org.apache.flink.api.scala.expressions.JavaStreamingTranslator +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Convencience methods for creating an [[org.apache.flink.api.expressions.ExpressionOperation]] + * and for converting an [[org.apache.flink.api.expressions.ExpressionOperation]] back + * to a [[org.apache.flink.api.java.DataSet]] or + * [[org.apache.flink.streaming.api.datastream.DataStream]]. + */ +object ExpressionUtil { + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. + * The fields of the DataSet type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * ExpressionUtil.from(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def from[T](set: DataSet[T], fields: String): ExpressionOperation[JavaBatchTranslator] = { + new JavaBatchTranslator().createExpressionOperation(set, fields) + } + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. + * The fields of the DataSet type are used to name the + * [[org.apache.flink.api.expressions.ExpressionOperation]] fields. + */ + def from[T](set: DataSet[T]): ExpressionOperation[JavaBatchTranslator] = { + new JavaBatchTranslator().createExpressionOperation(set) + } + + /** + * Transforms the given DataStream to a [[org.apache.flink.api.expressions.ExpressionOperation]]. + * The fields of the DataSet type are renamed to the given set of fields: + * + * Example: + * + * {{{ + * ExpressionUtil.from(set, "a, b") + * }}} + * + * This will transform the set containing elements of two fields to a table where the fields + * are named a and b. + */ + def from[T](set: DataStream[T], fields: String): ExpressionOperation[JavaStreamingTranslator] = { + new JavaStreamingTranslator().createExpressionOperation(set, fields) + } + + /** + * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. + * The fields of the DataSet type are used to name the + * [[org.apache.flink.api.expressions.ExpressionOperation]] fields. + */ + def from[T](set: DataStream[T]): ExpressionOperation[JavaStreamingTranslator] = { + new JavaStreamingTranslator().createExpressionOperation(set) + } + + /** + * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to + * a DataSet. The given type must have exactly the same fields as the + * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toSet[T]( + op: ExpressionOperation[JavaBatchTranslator], + clazz: Class[T]): DataSet[T] = { + op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]] + } + + /** + * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to + * a DataStream. The given type must have exactly the same fields as the + * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the + * fields and the types must match. + */ + @SuppressWarnings(Array("unchecked")) + def toStream[T]( + op: ExpressionOperation[JavaStreamingTranslator], clazz: Class[T]): DataStream[T] = { + op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]] + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala index 037efd4..ae41ceb 100644 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.parser.ExpressionParser import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction} import org.apache.flink.api.expressions.tree._ import org.apache.flink.api.expressions.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} @@ -43,9 +44,27 @@ class JavaBatchTranslator extends OperationTranslator { type Representation[A] = JavaDataSet[A] + + def createExpressionOperation[A]( + repr: JavaDataSet[A]): ExpressionOperation[JavaBatchTranslator] = { + val fields = + repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference) + + createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]], false) + } + + def createExpressionOperation[A]( + repr: JavaDataSet[A], + expression: String): ExpressionOperation[JavaBatchTranslator] = { + val fields = ExpressionParser.parseExpressionList(expression) + + createExpressionOperation(repr, fields.toArray) + } + def createExpressionOperation[A]( repr: JavaDataSet[A], - fields: Array[Expression]): ExpressionOperation[JavaBatchTranslator] = { + fields: Array[Expression], + checkDeterministicFields: Boolean = true): ExpressionOperation[JavaBatchTranslator] = { // shortcut for DataSet[Row] repr.getType match { @@ -72,6 +91,11 @@ class JavaBatchTranslator extends OperationTranslator { val inputType = repr.getType.asInstanceOf[CompositeType[A]] + if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { + throw new ExpressionException(s"You cannot rename fields upon Table creaton: " + + s"Field order of input type $inputType is not deterministic." ) + } + if (fields.length != inputType.getFieldNames.length) { throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + "' and number of fields in input type " + inputType + " do not match.") @@ -80,7 +104,7 @@ class JavaBatchTranslator extends OperationTranslator { val newFieldNames = fields map { case UnresolvedFieldReference(name) => name case e => - throw new ExpressionException("Only field expressions allowed in 'as' operation, " + + throw new ExpressionException("Only field references allowed in 'as' operation, " + " offending expression: " + e) } @@ -167,6 +191,13 @@ class JavaBatchTranslator extends OperationTranslator { case Root(dataSet: JavaDataSet[Row], resultFields) => dataSet + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op) + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + case As(input, newNames) => val translatedInput = translateInternal(input) val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] @@ -333,6 +364,11 @@ class JavaBatchTranslator extends OperationTranslator { val (reducedPredicate, leftFields, rightFields) = ExtractEquiJoinFields(leftType, rightType, predicate) + if (leftFields.isEmpty || rightFields.isEmpty) { + throw new ExpressionException("Could not derive equi-join predicates " + + "for predicate " + predicate + ".") + } + val leftKey = new ExpressionKeys[L](leftFields, leftType) val rightKey = new ExpressionKeys[R](rightFields, rightType) http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala index 095823e..56c38af 100644 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala @@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.expressions.operations._ +import org.apache.flink.api.expressions.parser.ExpressionParser import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} import org.apache.flink.api.expressions.tree._ import org.apache.flink.api.expressions.typeinfo.RowTypeInfo @@ -43,6 +44,23 @@ class JavaStreamingTranslator extends OperationTranslator { type Representation[A] = DataStream[A] + + def createExpressionOperation[A]( + repr: DataStream[A]): ExpressionOperation[JavaStreamingTranslator] = { + val fields = + repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference) + + createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]]) + } + + def createExpressionOperation[A]( + repr: DataStream[A], + expression: String): ExpressionOperation[JavaStreamingTranslator] = { + val fields = ExpressionParser.parseExpressionList(expression) + + createExpressionOperation(repr, fields.toArray) + } + def createExpressionOperation[A]( repr: DataStream[A], fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] = { @@ -80,7 +98,7 @@ class JavaStreamingTranslator extends OperationTranslator { val newFieldNames = fields map { case UnresolvedFieldReference(name) => name case e => - throw new ExpressionException("Only field expressions allowed in 'as' operation, " + + throw new ExpressionException("Only field references allowed in 'as' operation, " + " offending expression: " + e) } @@ -166,6 +184,13 @@ class JavaStreamingTranslator extends OperationTranslator { case Root(dataSet: DataStream[Row], resultFields) => dataSet + case Root(_, _) => + throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op) + + case GroupBy(_, fields) => + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + + "SELECT statement?") + case As(input, newNames) => throw new ExpressionException("As operation for Streams not yet implemented.") http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala index ef25b5b..1f6c397 100644 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala +++ b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala @@ -25,6 +25,9 @@ import scala.language.implicitConversions /** * These are all the operations that can be used to construct an [[Expression]] AST for expression * operations. + * + * These operations must be kept in sync with the parser in + * [[org.apache.flink.api.expressions.parser.ExpressionParser]]. */ trait ImplicitExpressionOperations { def expr: Expression @@ -87,6 +90,10 @@ trait ImplicitExpressionConversions { def expr = UnresolvedFieldReference(s.name) } + implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { + def expr = Literal(l) + } + implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { def expr = Literal(i) } http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java new file mode 100644 index 0000000..76c7fed --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.expressions.test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AggregationsITCase extends MultipleProgramsTestBase { + + + public AggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAggregationTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env)); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "231,1,21,21,11"; + } + + @Test(expected = ExpressionException.class) + public void testAggregationOnNonExistingField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env)); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("'foo.avg"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testWorkingAggregationDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,1,1,1.5,1.5,2"; + } + + @Test + public void testAggregationWithArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Float, String>> input = + env.fromElements( + new Tuple2<Float, String>(1f, "Hello"), + new Tuple2<Float, String>(2f, "Ciao")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); + + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "5.5,2 THE COUNT"; + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, + "Hello")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("f1.sum"); + + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testNoNestedAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<Float, String>(1f, "Hello")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("f0.sum.sum"); + + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java new file mode 100644 index 0000000..3b69be0 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AsITCase extends MultipleProgramsTestBase { + + + public AsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c"); + + DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java new file mode 100644 index 0000000..b4d1159 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class CastingITCase extends MultipleProgramsTestBase { + + + public CastingITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAutoCastToString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( + (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\""); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1b,1s,1i,1L,1.0f,1.0d"; + } + + @Test + public void testNumericAutocastInArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<Byte, Short, Integer, Long, Float, Double, String>( + (byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select("f0 + 1, f1 +" + + " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,2,2.0,2.0,2.0"; + } + + @Test + public void testNumericAutocastInComparison() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<Byte, Short, Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a,b,c,d,e,f,g"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,2,2,2.0,2.0,Hello"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java new file mode 100644 index 0000000..5c3a92a --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ExpressionsITCase extends MultipleProgramsTestBase { + + + public ExpressionsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Integer, Integer>> input = + env.fromElements(new Tuple2<Integer, Integer>(5, 10)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "0,10,2,10,1,-5"; + } + + @Test + public void testLogic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Integer, Boolean>> input = + env.fromElements(new Tuple2<Integer, Boolean>(5, true)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "b && true, b && false, b || false, !b"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "true,false,true,false"; + } + + @Test + public void testComparisons() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple3<Integer, Integer, Integer>> input = + env.fromElements(new Tuple3<Integer, Integer, Integer>(5, 5, 4)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "a > c, a >= b, a < c, a.isNull, a.isNotNull"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "true,true,false,false,true"; + } + + @Test + public void testBitwiseOperation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Byte, Byte>> input = + env.fromElements(new Tuple2<Byte, Byte>((byte) 3, (byte) 5)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,7,6,-4"; + } + + @Test + public void testBitwiseWithAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Integer, Byte>> input = + env.fromElements(new Tuple2<Integer, Byte>(3, (byte) 5)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,7,6,-4"; + } + + @Test(expected = ExpressionException.class) + public void testBitwiseWithNonWorkingAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSource<Tuple2<Float, Byte>> input = + env.fromElements(new Tuple2<Float, Byte>(3.0f, (byte) 5)); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b"); + + ExpressionOperation<JavaBatchTranslator> result = + expressionOperation.select("a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java new file mode 100644 index 0000000..7da5fa3 --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class FilterITCase extends MultipleProgramsTestBase { + + + public FilterITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testAllRejectingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .filter("false"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "\n"; + } + + @Test + public void testAllPassingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .filter("true"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + } + + @Test + public void testFilterOnIntegerTupleField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .filter(" a % 2 === 0 "); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java new file mode 100644 index 0000000..8141dea --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GroupedAggregationsITCase extends MultipleProgramsTestBase { + + + public GroupedAggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test(expected = ExpressionException.class) + public void testGroupingOnNonExistentField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .groupBy("foo").select("a.avg"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testGroupedAggregate() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .groupBy("b").select("b, a.sum"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + } + + @Test + public void testGroupingKeyForwardIfNotUsed() throws Exception { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> expressionOperation = + ExpressionUtil.from(input, "a, b, c"); + + ExpressionOperation<JavaBatchTranslator> result = expressionOperation + .groupBy("b").select("a.sum"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java new file mode 100644 index 0000000..3ece3dc --- /dev/null +++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.expressions.test; + +import org.apache.flink.api.expressions.ExpressionException; +import org.apache.flink.api.expressions.ExpressionOperation; +import org.apache.flink.api.expressions.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.expressions.ExpressionUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.scala.expressions.JavaBatchTranslator; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinITCase extends MultipleProgramsTestBase { + + + public JoinITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + private String expected = ""; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testJoin() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("b === e").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; + } + + @Test + public void testJoinWithFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("b === e && b < 2").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n"; + } + + @Test + public void testJoinWithMultipleKeys() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("a === d && b === h").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"; + } + + @Test(expected = ExpressionException.class) + public void testJoinNonExistingKey() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1.join(in2).where("foo === e").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testJoinWithNonMatchingKeyTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1 + .join(in2).where("a === g").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test(expected = ExpressionException.class) + public void testJoinWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, c"); + + ExpressionOperation<JavaBatchTranslator> result = in1 + .join(in2).where("a === d").select("c, g"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = ""; + } + + @Test + public void testJoinWithAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + + ExpressionOperation<JavaBatchTranslator> in1 = ExpressionUtil.from(ds1, "a, b, c"); + ExpressionOperation<JavaBatchTranslator> in2 = ExpressionUtil.from(ds2, "d, e, f, g, h"); + + ExpressionOperation<JavaBatchTranslator> result = in1 + .join(in2).where("a === d").select("g.count"); + + DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class); + ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + + expected = "6"; + } + +}