Revert "[FLINK-2210] Table API support for aggregation on columns with null values"
This reverts commit b59c81bc41f0fc4ade5359dfdf42549a76d412fa. The commit had to be reverted because the RowSerializer is not in sync with other comparators and serializers. See FLINK-2236. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff0a1a0b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff0a1a0b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff0a1a0b Branch: refs/heads/master Commit: ff0a1a0b57ea239ef05d240ca9d9d5bd20d458ec Parents: b08669a Author: Maximilian Michels <m...@apache.org> Authored: Tue Oct 6 12:34:19 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue Oct 6 17:16:55 2015 +0200 ---------------------------------------------------------------------- .../table/codegen/ExpressionCodeGenerator.scala | 19 ------- .../api/table/expressions/aggregations.scala | 2 +- .../api/table/expressions/comparison.scala | 8 --- .../runtime/ExpressionAggregateFunction.scala | 5 +- .../scala/table/test/AggregationsITCase.scala | 58 +------------------- 5 files changed, 4 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala index 43396d9..c9220e9 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -514,25 +514,6 @@ abstract class ExpressionCodeGenerator[R]( """.stripMargin } - case NumericIsNotNull(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |if ($nullTerm) { - | 0; - |} else { - | $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0; - """.stripMargin - } - case _ => throw new ExpressionException("Could not generate code for expression " + expr) } http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala index a762f66..08e319d 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala @@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation { case class Avg(child: Expression) extends Aggregation { override def toString = s"($child).avg" - override def getIntermediateFields: Seq[Expression] = Seq(child, NumericIsNotNull(child)) + override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) // This is just sweet. Use our own AST representation and let the code generator do // our dirty work. override def getFinalField(inputs: Seq[Expression]): Expression = http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index c60acf9..687ea7a 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -91,11 +91,3 @@ case class IsNotNull(child: Expression) extends UnaryExpression { override def toString = s"($child).isNotNull" } - -case class NumericIsNotNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.INT_TYPE_INFO - } - - override def toString = s"($child).numericIsNotNull" -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala index 7d7dc1c..7e9bc0d 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala @@ -53,10 +53,7 @@ class ExpressionAggregateFunction( var i = 0 val len = functions.length while (i < len) { - val element: Any = current.productElement(fieldPositions(i)) - if (element != null){ - functions(i).aggregate(element) - } + functions(i).aggregate(current.productElement(fieldPositions(i))) i += 1 } } http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala index acbeab7..7ac8eef 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -18,16 +18,13 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.table.{Row, ExpressionException} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{ExpressionException, Row} import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.Assert._ import org.junit._ import org.junit.rules.TemporaryFolder import org.junit.runner.RunWith @@ -127,56 +124,5 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa expected = "" } - @Test - def testAggregationWithNullValues(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val dataSet = env.fromElements[(Integer, String)]( - (123, "a"), (234, "b"), (345, "c"), (0, "d")) - - implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name")) - - val rowDataSet = dataSet.map { - entry => - val row = new Row(2) - val amount = if (entry._1 > 200) entry._1 else null - row.setField(0, amount) - row.setField(1, entry._2) - row - } - - val entries = rowDataSet.toTable.select('id.avg, 'id.sum, 'id.count).collect().head - val mean = entries.productElement(0).toString.toInt - val sum = entries.productElement(1).toString.toInt - val count = entries.productElement(2).toString.toInt - - assertEquals(4,count) - - val computedMean = sum / 2 - assertEquals(computedMean, mean) - } - - @Test - def testAggregationWhenAllValuesAreNull(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val dataSet = env.fromElements[(Integer, String)]( - (123, "a"), (234, "b"), (345, "c"), (0, "d")) - - implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name")) - - val rowDataSet = dataSet.map { - entry => - val row = new Row(2) - row.setField(0, null) - row.setField(1, entry._2) - row - } - - val entries = rowDataSet.toTable.select('id.max).collect().head.productElement(0) - assertEquals(entries, null) - } }