[
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
]
Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM:
---
Hello
I think that it needs to change
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
also, because
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will
be called if elements are in inputData.
{code}
TypeSerializer inSerializer =
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer =
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}}
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum
class is extends SumAggregate[T]
}
{code}
then next test will be passed
{code}
@Test
def testSumNullElements(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery =
"SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"
val ds = env.fromElements(
(1: Byte, 2l,1D,1f,1,1:Short ),
(2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testCountNullElements(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery =
"SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
"FROM (select * from MyTable where _1 = 4)"
val ds = env.fromElements(
(1: Byte, 2l,1D,1f,1,1:Short ),
(2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "0,0,0,0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}
was (Author: anmu):
Hello
I think that it needs to change
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
also, because
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will
be called if elements are in inputData.
{code}
TypeSerializer inSerializer =
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer =
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}}
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum
class is extends SumAggregate[T]
}
{code}
then next test will be passed
{code}
@Test
def testDataSetAggregation(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery = "SELECT sum(_1) FROM MyTable"
val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@Test
def testSumNullElements(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val sqlQuery =
"SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"
val ds = env.fromElements(
(1: Byte, 2l,1D,1f,1,1:Short ),
(2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)
val result = tEnv.sql(sqlQuery)
val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}
> Count/Sum 0 elements
>
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project