[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214681480 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations { def varSamp = VarSamp(expr) /** -* Returns multiset aggregate of a given expression. +* Returns multiset aggregate of a given expression. */ def collect = Collect(expr) + /** +* Returns a distinct field reference to a given expression +*/ + def distinct: Expression = { Review comment: Hi @walterddr , I guess what fabian means is putting this function into `org.apache.flink.table.expressions.Aggregation`, so that `distinct` can only be called by an aggregate. In this way, we can even remove the test cases such as `testDistinctOnNonAggregateExpression` since compile error will be thrown directly by the IDE. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214681480 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ## @@ -214,10 +214,25 @@ trait ImplicitExpressionOperations { def varSamp = VarSamp(expr) /** -* Returns multiset aggregate of a given expression. +* Returns multiset aggregate of a given expression. */ def collect = Collect(expr) + /** +* Returns a distinct field reference to a given expression +*/ + def distinct: Expression = { Review comment: Hi @walterddr , I guess what fabian means is putting this function in `org.apache.flink.table.expressions.Aggregation`, so that distinct can only be called by an aggregate. In this way, we can even remove the test cases such as `testDistinctOnNonAggregateExpression` since compile error will be thrown directly by the IDE. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r214677464 ## File path: docs/dev/table/tableApi.md ## @@ -381,6 +381,36 @@ Table result = orders {% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + + + +Distinct Aggregation +Streaming Review comment: Cool, thx This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r212824390 ## File path: docs/dev/table/tableApi.md ## @@ -381,6 +381,36 @@ Table result = orders {% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + + + +Distinct Aggregation +Streaming Review comment: Add batch label? It seems batch already support distinct. BTW, over is not yet supported in batch. Maybe it is better to remove these documents about distinct aggregation? Append a distinct column in all Aggregation examples? For `GroupBy Aggregation`, change the sql from ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d"); ``` to ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as e"); ``` What's more, we probably should add UDAGG document like it in SQL document about Aggregations and add distinct column. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209413346 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r208549921 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ## @@ -235,6 +235,13 @@ case class Aggregate( groupingExprs.foreach(validateGroupingExpression) def validateAggregateExpression(expr: Expression): Unit = expr match { + case distinctExpr: DistinctAgg => +distinctExpr.child match { + case _: AggFunctionCall => +failValidation("UDAGG should prepend 'distinct' modifier before arguments") Review comment: UDAGG should prepend 'distinct' modifier before arguments, for example, udagg.distinct('a, 'b). Add an example may be more understandable. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r208549808 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ## @@ -52,12 +62,49 @@ abstract sealed class Aggregation extends Expression { } +case class DistinctAgg(child: Expression) extends Aggregation { + + override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { +child.asInstanceOf[Aggregation].toAggCall(name, isDistinct = true) + } + + override private[flink] def resultType: TypeInformation[_] = child.resultType + + override private[flink] def validateInput(): ValidationResult = { +super.validateInput() +child match { + case agg: Aggregation => +child.validateInput() + case _ => +ValidationFailure(s"Distinct modifier cannot be applied to $child. " + + s"It can only be applied to an aggregation expression!") Review comment: It can only be applied to an aggregation expression, for example, 'a.count.distinct which means COUNT(DISTINCT a). Add an example may be more understandable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r208549873 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { +this.isDistinctAgg = true Review comment: Throw exception when `isDistinctAgg` is true already? In order to validate expressions such as `udagg.distinct.distinct('a, 'b)`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r208549936 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -21,11 +21,24 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg import org.apache.flink.table.utils.TableTestBase import org.junit.Test class AggregateValidationTest extends TableTestBase { + @Test(expected = classOf[ValidationException]) + def testDistinctModifierOnBothSide(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) +val func = new WeightedAvg + +val ds = table + // must fail. '_foo is not a valid field Review comment: Adapt this comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services