[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000530 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000842 ## File path: docs/dev/table/tableApi.md ## @@ -1889,6 +1889,57 @@ tableEnv.registerFunction("func", func); Table table = input .flatMap("func(c)").as("a, b") +{% endhighlight %} + + + + + +Aggregate +Batch Streaming + + +Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type. +{% highlight java %} +public class MyMinMaxAcc { +public int min = 0; +public int max = 0; +} + +public class MyMinMax extends AggregateFunction { + +public void accumulate(MyMinMaxAcc acc, int value) { +if (value < acc.min) { +acc.min = value; +} +if (value > acc.max) { +acc.max = value; +} +} + +@Override +public MyMinMaxAcc createAccumulator() { +return new MyMinMaxAcc(); +} + +@Override +public Row getValue(MyMinMaxAcc acc) { +return Row.of(acc.min, acc.max); +} + +@Override +public TypeInformation getResultType() { +return new RowTypeInfo(Types.INT, Types.INT); +} +} + +AggregateFunction myAggFunc = new MyMinMax(); + +tableEnv.registerFunction("myAggFunc", myAggFunc); +Table table = input + .groupBy("key") + .aggregate("myAggFunc(a, b) as (x, y, z)") Review comment: as (x, y) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001161 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { Review comment: This test failed with the exception: org.apache.flink.table.api.ValidationException: Given parameters of function 'func' do not match any signature. Actual: (java.lang.Long) Expected: (java.lang.String) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000705 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) + +// extract alias and aggregate function +var alias: Seq[String] = Seq() +val aggWithoutAlias = resolvedAggregate match { + case c: CallExpression +if c.getFunctionDefinition.getName == BuiltInFunctionDefinitions.AS.getName => { +alias = c.getChildren + .drop(1) + .map(e => e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String]) +c.getChildren.get(0) + } + case c: CallExpression +if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] => { +if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo( + c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1 +c + } + case e => e Review comment: this line can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000442 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -580,3 +596,30 @@ class OverWindowedTableImpl( ) } } + +/** + * The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys. Review comment: a -> an that has been performed on an aggregate function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000507 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -580,3 +596,30 @@ class OverWindowedTableImpl( ) } } + +/** + * The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys. + */ +class AggregatedTableImpl( + private[flink] val table: Table, Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000356 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ## @@ -1058,4 +1058,39 @@ * */ Table flatMap(Expression tableFunction); + + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* +* {@code +* AggregateFunction aggFunc = new MyAggregateFunction() +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)") +* .select("key, f0, f1") Review comment: select("f0, f1")? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000946 ## File path: docs/dev/table/tableApi.md ## @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] { val func = new MyFlatMapFunction val table = input .flatMap(func('c)).as('a, 'b) +{% endhighlight %} + + + + + +Aggregate +Batch Streaming + + +Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type. +{% highlight scala %} +case class MyMinMaxAcc(var min: Int, var max: Int) + +class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { + + def accumulate(acc: MyMinMaxAcc, value: Int): Unit = { +if (value < acc.min) { + acc.min = value +} +if (value > acc.max) { + acc.max = value +} + } + + def resetAccumulator(acc: MyMinMaxAcc): Unit = { Review comment: The Scala example has resetAccumulator, while the Java example has not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000689 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) + +// extract alias and aggregate function +var alias: Seq[String] = Seq() +val aggWithoutAlias = resolvedAggregate match { + case c: CallExpression +if c.getFunctionDefinition.getName == BuiltInFunctionDefinitions.AS.getName => { Review comment: the brace is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001183 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + .aggregate('b.sum as 'd) + // must fail. Cannot use TableFunction in select after aggregate + .select("func(a)") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate Review comment: The exception message is not friendly for users: org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 'd'] for function: as This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001232 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + .aggregate('b.sum as 'd) + // must fail. Cannot use TableFunction in select after aggregate + .select("func(a)") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate + .aggregate('b.log as 'd) + .select('a, 'd) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate2(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate + .aggregate("func(c) as d") + .select('a, 'd) + } + + @Test(expected = classOf[ExpressionParserException]) + def testMultipleAggregateExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate Review comment: Only AggregateFunction -> Only one AggregateFunction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000677 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) Review comment: use resolveExpression? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services