[8/8] flink git commit: [FLINK-2210] Table API support for aggregation on columns with null values

2015-06-16 Thread mxm
[FLINK-2210] Table API support for aggregation on columns with null values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0914a7d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0914a7d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0914a7d0

Branch: refs/heads/release-0.9
Commit: 0914a7d0a8f564c26b6b9c0f37e8685db5d880ed
Parents: adc3e0e
Author: Shiti 
Authored: Mon Jun 15 00:29:02 2015 +0530
Committer: Maximilian Michels 
Committed: Wed Jun 17 00:51:17 2015 +0200

--
 .../table/codegen/ExpressionCodeGenerator.scala | 19 +++
 .../api/table/expressions/aggregations.scala|  2 +-
 .../api/table/expressions/comparison.scala  |  8 +++
 .../runtime/ExpressionAggregateFunction.scala   |  5 +-
 .../scala/table/test/AggregationsITCase.scala   | 58 +++-
 5 files changed, 88 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index 49f7600..e109574 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -489,6 +489,25 @@ abstract class ExpressionCodeGenerator[R](
 """.stripMargin
 }
 
+  case NumericIsNotNull(child) =>
+val childCode = generateExpression(child)
+if (nullCheck) {
+  childCode.code +
+s"""
+   |boolean $nullTerm = ${childCode.nullTerm};
+   |if ($nullTerm) {
+   |  0;
+   |} else {
+   |  $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 
: 0;
+   |}
+""".stripMargin
+} else {
+  childCode.code +
+s"""
+   |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 
0;
+""".stripMargin
+}
+
   case _ => throw new ExpressionException("Could not generate code for 
expression " + expr)
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index 08e319d..a762f66 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation {
 case class Avg(child: Expression) extends Aggregation {
   override def toString = s"($child).avg"
 
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
+  override def getIntermediateFields: Seq[Expression] = Seq(child, 
NumericIsNotNull(child))
   // This is just sweet. Use our own AST representation and let the code 
generator do
   // our dirty work.
   override def getFinalField(inputs: Seq[Expression]): Expression =

http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 687ea7a..c60acf9 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -91,3 +91,11 @@ case class IsNotNull(child: Expression) extends 
UnaryExpression {
 
   override def toString = s"($child).isNotNull"
 }
+
+case class NumericIsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+BasicTypeInfo.INT_TYPE_INFO
+  }
+
+  override def toString = s"($child).numericIsNotNull"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0914a7d0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/E

flink git commit: [FLINK-2210] Table API support for aggregation on columns with null values

2015-06-16 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 46ad40588 -> b59c81bc4


[FLINK-2210] Table API support for aggregation on columns with null values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b59c81bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b59c81bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b59c81bc

Branch: refs/heads/master
Commit: b59c81bc41f0fc4ade5359dfdf42549a76d412fa
Parents: 46ad405
Author: Shiti 
Authored: Mon Jun 15 00:29:02 2015 +0530
Committer: Aljoscha Krettek 
Committed: Tue Jun 16 18:38:48 2015 +0200

--
 .../table/codegen/ExpressionCodeGenerator.scala | 19 +++
 .../api/table/expressions/aggregations.scala|  2 +-
 .../api/table/expressions/comparison.scala  |  8 +++
 .../runtime/ExpressionAggregateFunction.scala   |  5 +-
 .../scala/table/test/AggregationsITCase.scala   | 58 +++-
 5 files changed, 88 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index 49f7600..e109574 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -489,6 +489,25 @@ abstract class ExpressionCodeGenerator[R](
 """.stripMargin
 }
 
+  case NumericIsNotNull(child) =>
+val childCode = generateExpression(child)
+if (nullCheck) {
+  childCode.code +
+s"""
+   |boolean $nullTerm = ${childCode.nullTerm};
+   |if ($nullTerm) {
+   |  0;
+   |} else {
+   |  $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 
: 0;
+   |}
+""".stripMargin
+} else {
+  childCode.code +
+s"""
+   |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 
0;
+""".stripMargin
+}
+
   case _ => throw new ExpressionException("Could not generate code for 
expression " + expr)
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index 08e319d..a762f66 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation {
 case class Avg(child: Expression) extends Aggregation {
   override def toString = s"($child).avg"
 
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
+  override def getIntermediateFields: Seq[Expression] = Seq(child, 
NumericIsNotNull(child))
   // This is just sweet. Use our own AST representation and let the code 
generator do
   // our dirty work.
   override def getFinalField(inputs: Seq[Expression]): Expression =

http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
--
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index 687ea7a..c60acf9 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -91,3 +91,11 @@ case class IsNotNull(child: Expression) extends 
UnaryExpression {
 
   override def toString = s"($child).isNotNull"
 }
+
+case class NumericIsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+BasicTypeInfo.INT_TYPE_INFO
+  }
+
+  override def toString = s"($child).numericIsNotNull"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b59c81bc/fli