[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000530
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000842
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1889,6 +1889,57 @@ tableEnv.registerFunction("func", func);
 
 Table table = input
   .flatMap("func(c)").as("a, b")
+{% endhighlight %}
+  
+
+
+
+  
+Aggregate
+Batch Streaming
+  
+  
+Performs an aggregate operation with an aggregate function. You 
have to close the "aggregate" with a select statement. The output of aggregate 
will be flattened if the output type is a composite type.
+{% highlight java %}
+public class MyMinMaxAcc {
+public int min = 0;
+public int max = 0;
+}
+
+public class MyMinMax extends AggregateFunction {
+
+public void accumulate(MyMinMaxAcc acc, int value) {
+if (value < acc.min) {
+acc.min = value;
+}
+if (value > acc.max) {
+acc.max = value;
+}
+}
+
+@Override
+public MyMinMaxAcc createAccumulator() {
+return new MyMinMaxAcc();
+}
+
+@Override
+public Row getValue(MyMinMaxAcc acc) {
+return Row.of(acc.min, acc.max);
+}
+
+@Override
+public TypeInformation getResultType() {
+return new RowTypeInfo(Types.INT, Types.INT);
+}
+}
+
+AggregateFunction myAggFunc = new MyMinMax();
+
+tableEnv.registerFunction("myAggFunc", myAggFunc);
+Table table = input
+  .groupBy("key")
+  .aggregate("myAggFunc(a, b) as (x, y, z)")
 
 Review comment:
   as (x, y)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001161
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
 
 Review comment:
   This test failed with the exception:
   org.apache.flink.table.api.ValidationException: Given parameters of function 
'func' do not match any signature. 
   Actual: (java.lang.Long) 
   Expected: (java.lang.String)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000705
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
+
+// extract alias and aggregate function
+var alias: Seq[String] = Seq()
+val aggWithoutAlias = resolvedAggregate match {
+  case c: CallExpression
+if c.getFunctionDefinition.getName == 
BuiltInFunctionDefinitions.AS.getName => {
+alias = c.getChildren
+  .drop(1)
+  .map(e => 
e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String])
+c.getChildren.get(0)
+  }
+  case c: CallExpression
+if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] 
=> {
+if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo(
+  
c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1
+c
+  }
+  case e => e
 
 Review comment:
   this line can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000442
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -580,3 +596,30 @@ class OverWindowedTableImpl(
 )
   }
 }
+
+/**
+  * The implementation of a [[AggregatedTable]] that has been grouped on a set 
of grouping keys.
 
 Review comment:
   a -> an
   that has been performed on an aggregate function.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000507
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -580,3 +596,30 @@ class OverWindowedTableImpl(
 )
   }
 }
+
+/**
+  * The implementation of a [[AggregatedTable]] that has been grouped on a set 
of grouping keys.
+  */
+class AggregatedTableImpl(
+  private[flink] val table: Table,
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000356
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 ##
 @@ -1058,4 +1058,39 @@
 * 
 */
Table flatMap(Expression tableFunction);
+
+   /**
+* Performs a global aggregate operation with an aggregate function. 
Use this before a selection
+* to perform the selection operation. The output will be flattened if 
the output type is a
+* composite type.
+*
+* Example:
+*
+* 
+* {@code
+*   AggregateFunction aggFunc = new MyAggregateFunction()
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)")
+* .select("key, f0, f1")
 
 Review comment:
   select("f0, f1")?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000946
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] {
 val func = new MyFlatMapFunction
 val table = input
   .flatMap(func('c)).as('a, 'b)
+{% endhighlight %}
+  
+
+
+
+  
+Aggregate
+Batch Streaming
+  
+  
+Performs an aggregate operation with an aggregate function. You 
have to close the "aggregate" with a select statement. The output of aggregate 
will be flattened if the output type is a composite type.
+{% highlight scala %}
+case class MyMinMaxAcc(var min: Int, var max: Int)
+
+class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
+
+  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
+if (value < acc.min) {
+  acc.min = value
+}
+if (value > acc.max) {
+  acc.max = value
+}
+  }
+
+  def resetAccumulator(acc: MyMinMaxAcc): Unit = {
 
 Review comment:
   The Scala example has resetAccumulator, while the Java example has not. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000689
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
+
+// extract alias and aggregate function
+var alias: Seq[String] = Seq()
+val aggWithoutAlias = resolvedAggregate match {
+  case c: CallExpression
+if c.getFunctionDefinition.getName == 
BuiltInFunctionDefinitions.AS.getName => {
 
 Review comment:
   the brace is not necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001183
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  .aggregate('b.sum as 'd)
+  // must fail. Cannot use TableFunction in select after aggregate
+  .select("func(a)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
 
 Review comment:
   The exception message is not friendly for users:
   org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 
'd'] for function: as


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001232
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  .aggregate('b.sum as 'd)
+  // must fail. Cannot use TableFunction in select after aggregate
+  .select("func(a)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
+  .aggregate('b.log as 'd)
+  .select('a, 'd)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate2(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
+  .aggregate("func(c) as d")
+  .select('a, 'd)
+  }
+
+  @Test(expected = classOf[ExpressionParserException])
+  def testMultipleAggregateExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
 
 Review comment:
   Only AggregateFunction -> Only one AggregateFunction 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000677
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
 
 Review comment:
   use resolveExpression?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services