[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
 ] 

Anton Mushin commented on FLINK-4832:
-------------------------------------

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
        TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
        TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
        for (IN element : inputData) {
                IN inCopy = inSerializer.copy(element);
                OUT out = function.map(inCopy);
                result.add(outSerializer.copy(out));
        }
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
    partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery = "SELECT sum(_1) FROM MyTable"

    val ds = CollectionDataSets.get3TupleDataSet(env)
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "231"
    val results = result.toDataSet[Row].collect()
    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery =
      "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
        "FROM (select * from MyTable where _1 = 4)"

    val ds = env.fromElements(
      (1: Byte, 2l,1D,1f,1,1:Short ),
      (2: Byte, 2l,1D,1f,1,1:Short ))
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "0,0,0.0,0.0,0,0"
    val results = result.toDataSet[Row].collect()
    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> --------------------
>
>                 Key: FLINK-4832
>                 URL: https://issues.apache.org/jira/browse/FLINK-4832
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to