[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-09-03 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214681480
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations {
   def varSamp = VarSamp(expr)
 
   /**
-*  Returns multiset aggregate of a given expression.
+* Returns multiset aggregate of a given expression.
 */
   def collect = Collect(expr)
 
+  /**
+* Returns a distinct field reference to a given expression
+*/
+  def distinct: Expression = {
 
 Review comment:
   Hi @walterddr , I guess what fabian means is putting this function into 
`org.apache.flink.table.expressions.Aggregation`, so that `distinct` can only 
be called by an aggregate. In this way, we can even remove the test cases such 
as `testDistinctOnNonAggregateExpression` since compile error will be thrown 
directly by the IDE. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-09-03 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214681480
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations {
   def varSamp = VarSamp(expr)
 
   /**
-*  Returns multiset aggregate of a given expression.
+* Returns multiset aggregate of a given expression.
 */
   def collect = Collect(expr)
 
+  /**
+* Returns a distinct field reference to a given expression
+*/
+  def distinct: Expression = {
 
 Review comment:
   Hi @walterddr , I guess what fabian means is putting this function in 
`org.apache.flink.table.expressions.Aggregation`, so that distinct can only be 
called by an aggregate. In this way, we can even remove the test cases such as 
`testDistinctOnNonAggregateExpression` since compile error will be thrown 
directly by the IDE. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-09-03 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r214677464
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -381,6 +381,36 @@ Table result = orders
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
+
+  
+Distinct Aggregation
+Streaming
 
 Review comment:
   Cool, thx


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-26 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r212824390
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -381,6 +381,36 @@ Table result = orders
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
+
+  
+Distinct Aggregation
+Streaming
 
 Review comment:
   Add batch label? It seems batch already support distinct. BTW, over is not 
yet supported in batch.
   
   Maybe it is better to remove these documents about distinct aggregation? 
Append a distinct column in all Aggregation examples?
   For `GroupBy Aggregation`, change the sql 
   from
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d");
   ```
   to
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as 
e");
   ```
   What's more, we probably should add UDAGG document like it in SQL document 
about Aggregations and add distinct column.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-10 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209413346
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-08 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r208549921
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ##
 @@ -235,6 +235,13 @@ case class Aggregate(
 groupingExprs.foreach(validateGroupingExpression)
 
 def validateAggregateExpression(expr: Expression): Unit = expr match {
+  case distinctExpr: DistinctAgg =>
+distinctExpr.child match {
+  case _: AggFunctionCall =>
+failValidation("UDAGG should prepend 'distinct' modifier before 
arguments")
 
 Review comment:
   UDAGG should prepend 'distinct' modifier before arguments, for example, 
udagg.distinct('a, 'b).
   Add an example may be more understandable. :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-08 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r208549808
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ##
 @@ -52,12 +62,49 @@ abstract sealed class Aggregation extends Expression {
 
 }
 
+case class DistinctAgg(child: Expression) extends Aggregation {
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
+child.asInstanceOf[Aggregation].toAggCall(name, isDistinct = true)
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+super.validateInput()
+child match {
+  case agg: Aggregation =>
+child.validateInput()
+  case _ =>
+ValidationFailure(s"Distinct modifier cannot be applied to $child. " +
+  s"It can only be applied to an aggregation expression!")
 
 Review comment:
   It can only be applied to an aggregation expression, for example, 
'a.count.distinct which means COUNT(DISTINCT a).
   Add an example may be more understandable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-08 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r208549873
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
+this.isDistinctAgg = true
 
 Review comment:
   Throw exception when `isDistinctAgg` is true already? In order to validate 
expressions such as `udagg.distinct.distinct('a, 'b)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-08 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r208549936
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -21,11 +21,24 @@ package org.apache.flink.table.api.stream.table.validation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
 
 class AggregateValidationTest extends TableTestBase {
 
+  @Test(expected = classOf[ValidationException])
+  def testDistinctModifierOnBothSide(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+val func = new WeightedAvg
+
+val ds = table
+  // must fail. '_foo is not a valid field
 
 Review comment:
   Adapt this comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services