[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements

2016-11-17 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667131#comment-15667131
 ] 

Anton Mushin edited comment on FLINK-4832 at 11/17/16 12:45 PM:


Ok.
Could you check 
[commit|https://github.com/apache/flink/compare/master...ex00:FLINK-4832], is 
correct idea for implementation this issue?


was (Author: anmu):
Ok.
Could you check 
[commit|https://github.com/ex00/flink/commit/c93071585ebb21453b22c9c9d102964af06bf45a],
 is correct idea for implementation this issue?

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
 ] 

Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM:
---

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
 @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

@Test
  def testCountNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0,0,0,0"
val results = result.toDataSet[Row].collect()

TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}


was (Author: anmu):
Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project