Hi everybody,
Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832,
please?
Simple, I chose another option for resolve this issue, unlike as described in
issue description
In the
`org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections`
I added next code:
if( inputData.size() == 0) {
IN inCopy = inSerializer.createInstance();
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
And I change
`org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type
class in each [Type]SumAggregate class are extends SumAggregate[T]
}
And now next code is executing correct:
val sqlQuery =
"SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"
val ds = env.fromElements(
(1: Byte, 2l,1D,1f,1,1:Short ),
(2: Byte, 2l,1D,1f,1,1:Short ))
val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0"
val sqlQuery2 =
"SELECT
count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
"FROM (select * from MyTable where _1 = 4)"
val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0"
Is this the correct solution for this ticket or not?
Best regards,
Anton Mushin