[8/8] flink git commit: [FLINK-2210] Table API support for aggregation on columns with null values
[FLINK-2210] Table API support for aggregation on columns with null values Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0914a7d0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0914a7d0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0914a7d0 Branch: refs/heads/release-0.9 Commit: 0914a7d0a8f564c26b6b9c0f37e8685db5d880ed Parents: adc3e0e Author: Shiti Authored: Mon Jun 15 00:29:02 2015 +0530 Committer: Maximilian Michels Committed: Wed Jun 17 00:51:17 2015 +0200 -- .../table/codegen/ExpressionCodeGenerator.scala | 19 +++ .../api/table/expressions/aggregations.scala| 2 +- .../api/table/expressions/comparison.scala | 8 +++ .../runtime/ExpressionAggregateFunction.scala | 5 +- .../scala/table/test/AggregationsITCase.scala | 58 +++- 5 files changed, 88 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala index 49f7600..e109574 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -489,6 +489,25 @@ abstract class ExpressionCodeGenerator[R]( """.stripMargin } + case NumericIsNotNull(child) => +val childCode = generateExpression(child) +if (nullCheck) { + childCode.code + +s""" + |boolean $nullTerm = ${childCode.nullTerm}; + |if ($nullTerm) { + | 0; + |} else { + | $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; + |} +""".stripMargin +} else { + childCode.code + +s""" + |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; +""".stripMargin +} + case _ => throw new ExpressionException("Could not generate code for expression " + expr) } http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala index 08e319d..a762f66 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala @@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation { case class Avg(child: Expression) extends Aggregation { override def toString = s"($child).avg" - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) + override def getIntermediateFields: Seq[Expression] = Seq(child, NumericIsNotNull(child)) // This is just sweet. Use our own AST representation and let the code generator do // our dirty work. override def getFinalField(inputs: Seq[Expression]): Expression = http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index 687ea7a..c60acf9 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -91,3 +91,11 @@ case class IsNotNull(child: Expression) extends UnaryExpression { override def toString = s"($child).isNotNull" } + +case class NumericIsNotNull(child: Expression) extends UnaryExpression { + def typeInfo = { +BasicTypeInfo.INT_TYPE_INFO + } + + override def toString = s"($child).numericIsNotNull" +} http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/E
flink git commit: [FLINK-2210] Table API support for aggregation on columns with null values
Repository: flink Updated Branches: refs/heads/master 46ad40588 -> b59c81bc4 [FLINK-2210] Table API support for aggregation on columns with null values Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59c81bc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59c81bc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59c81bc Branch: refs/heads/master Commit: b59c81bc41f0fc4ade5359dfdf42549a76d412fa Parents: 46ad405 Author: Shiti Authored: Mon Jun 15 00:29:02 2015 +0530 Committer: Aljoscha Krettek Committed: Tue Jun 16 18:38:48 2015 +0200 -- .../table/codegen/ExpressionCodeGenerator.scala | 19 +++ .../api/table/expressions/aggregations.scala| 2 +- .../api/table/expressions/comparison.scala | 8 +++ .../runtime/ExpressionAggregateFunction.scala | 5 +- .../scala/table/test/AggregationsITCase.scala | 58 +++- 5 files changed, 88 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala index 49f7600..e109574 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -489,6 +489,25 @@ abstract class ExpressionCodeGenerator[R]( """.stripMargin } + case NumericIsNotNull(child) => +val childCode = generateExpression(child) +if (nullCheck) { + childCode.code + +s""" + |boolean $nullTerm = ${childCode.nullTerm}; + |if ($nullTerm) { + | 0; + |} else { + | $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; + |} +""".stripMargin +} else { + childCode.code + +s""" + |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; +""".stripMargin +} + case _ => throw new ExpressionException("Could not generate code for expression " + expr) } http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala index 08e319d..a762f66 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala @@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation { case class Avg(child: Expression) extends Aggregation { override def toString = s"($child).avg" - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) + override def getIntermediateFields: Seq[Expression] = Seq(child, NumericIsNotNull(child)) // This is just sweet. Use our own AST representation and let the code generator do // our dirty work. override def getFinalField(inputs: Seq[Expression]): Expression = http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala -- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index 687ea7a..c60acf9 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -91,3 +91,11 @@ case class IsNotNull(child: Expression) extends UnaryExpression { override def toString = s"($child).isNotNull" } + +case class NumericIsNotNull(child: Expression) extends UnaryExpression { + def typeInfo = { +BasicTypeInfo.INT_TYPE_INFO + } + + override def toString = s"($child).numericIsNotNull" +} http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/fli