[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15705147#comment-15705147
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2840
  
Merging


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15705051#comment-15705051
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2840
  
Thanks for the update @ex00!
PR is good to merge!


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704779#comment-15704779
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2840
  
Hi @fhueske, thanks for your review and comments!
I updated PR.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702284#comment-15702284
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r89806470
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan._
+import scala.collection.JavaConversions._
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, 
LogicalAggregate}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, 
DataSetConvention}
+
+/**
+  * Rule for insert [[Row]] with null records into a [[DataSetAggregate]]
+  * Rule apply for non grouped aggregate query
+  */
+class DataSetAggregateWithNullValuesRule
+  extends ConverterRule(
+classOf[LogicalAggregate],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"DataSetAggregateWithNullValuesRule")
+{
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+//for grouped agg sets shouldn't attach of null row
+//need apply other rules. e.g. [[DataSetAggregateRule]]
+if (!agg.getGroupSet.isEmpty) {
+  return false
+}
+
+// TODO code duplicates DataSetAggregateRule#matches
+// check if we have distinct aggregates
+val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+if (distinctAggs) {
+  throw TableException("DISTINCT aggregates are currently not 
supported.")
+}
+
+// check if we have grouping sets
+val groupSets = agg.getGroupSets.size() == 0 || 
agg.getGroupSets.get(0) != agg.getGroupSet
+if (groupSets || agg.indicator) {
+  throw TableException("GROUPING SETS are currently not supported.")
+}
+!distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val cluster: RelOptCluster = rel.getCluster
+
+val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType)
+val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] =
+  ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+for (fieldType <- fieldTypes)
+  yield {
+cluster.getRexBuilder.
+  makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral]
+  }))
+
+val logicalValues = LogicalValues.create(cluster, 
agg.getInput.getRowType, nullLiterals)
+val logicalUnion = LogicalUnion.create(List(logicalValues, 
agg.getInput), true)
+val logicalAggregate = new LogicalAggregate(
--- End diff --

I think we do not need the `LogicalAggregate` but can directly create the 
`DataSetAggregate`


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by 

[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702287#comment-15702287
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r89808397
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregationTest extends TableTestBase {
+
+  @Test
+  def testAggregateQueryBatchSQL(): Unit = {
+val util = batchTestUtil()
+util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+val sqlQuery = (
+  "SELECT avg(a),sum(b),count(c) FROM MyTable",
--- End diff --

I would separate the test into two methods.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702285#comment-15702285
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r89808446
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregationTest extends TableTestBase {
+
+  @Test
+  def testAggregateQueryBatchSQL(): Unit = {
+val util = batchTestUtil()
+util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+val sqlQuery = (
+  "SELECT avg(a),sum(b),count(c) FROM MyTable",
+  "SELECT avg(a),sum(b),count(c) FROM MyTable WHERE a = 1"
+)
+
+val calcNode = unaryNode(
+  "DataSetCalc",
+  batchTableNode(0),
+  term("select", "a", "b", "c"),
+  term("where", "=(a, 1)")
+)
+
+val setValues = (
+  unaryNode(
+"DataSetValues",
+batchTableNode(0),
+tuples(List(null,null,null)),
+term("values","a","b","c")
+  ),
+  unaryNode(
+"DataSetValues",
+calcNode,
+tuples(List(null,null,null)),
+term("values","a","b","c")
+  )
+)
+val union = (
+  unaryNode(
+  "DataSetUnion",
+  setValues._1,
+  term("union","a","b","c")
+),
+  unaryNode(
+  "DataSetUnion",
+  setValues._2,
+  term("union","a","b","c")
+  )
+)
+
+val aggregate = (
+  unaryNode(
+"DataSetAggregate",
+union._1,
+term("select",
+  "AVG(a) AS EXPR$0",
+  "SUM(b) AS EXPR$1",
+  "COUNT(c) AS EXPR$2")
+  ),
+  unaryNode(
+"DataSetAggregate",
+union._2,
+term("select",
+  "AVG(a) AS EXPR$0",
+  "SUM(b) AS EXPR$1",
+  "COUNT(c) AS EXPR$2")
+  )
+)
+util.verifySql(sqlQuery._1, aggregate._1)
+util.verifySql(sqlQuery._2, aggregate._2)
+  }
+
+  @Test
+  def testAggregateGroupQueryBatchSQL(): Unit = {
+val util = batchTestUtil()
+util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+val sqlQuery = (
+  "SELECT avg(a),sum(b),count(c) FROM MyTable GROUP BY a",
--- End diff --

I would separate the test into two methods.



> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702286#comment-15702286
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r89809360
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
 ---
@@ -400,5 +400,37 @@ class AggregationsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testAggregateEmptyDataSets(): Unit = {
--- End diff --

Table API and SQL are internally represented the same way.
We can drop the expensive ITCase for the Table API and add tests that check 
the plan based on `TableTestBase` for the Table API instead.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702148#comment-15702148
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2840
  
Hi
I updated PR, added new rule for non grouped aggregate data and added 
`AggregationTest` for check plan of query.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690049#comment-15690049
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2840
  
If we fix this issue with a `RelOptRule` we can do all tests with the 
`TableTestBase` because the `DataSetValues` with the emptry record will show up 
in the plan.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690003#comment-15690003
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2840
  
Hi @StephanEwen
I found a way check these changes via `TableTestBase`  how told @fhueske
```scala
@Test
  def testAggregateQueryBatchSQL(): Unit = {
val util = batchTestUtil()
util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)

val sqlQuery = (
  "SELECT avg(a),sum(b),count(c) FROM MyTable",
  "SELECT avg(a),sum(b),count(c) FROM MyTable WHERE a = 1"
)

val calcNode = unaryNode(
  "DataSetCalc",
  batchTableNode(0),
  term("select", "a", "b", "c"),
  term("where", "=(a, 1)")
)

val aggregate = (
  unaryNode(
"DataSetAggregate",
batchTableNode(0),
term("select",
  "AVG(a) AS EXPR$0",
  "SUM(b) AS EXPR$1",
  "COUNT(c) AS EXPR$2")
  ),
  unaryNode(
"DataSetAggregate",
calcNode,
term("select",
  "AVG(a) AS EXPR$0",
  "SUM(b) AS EXPR$1",
  "COUNT(c) AS EXPR$2")
  )
)
util.verifySql(sqlQuery._1, aggregate._1)
util.verifySql(sqlQuery._2, aggregate._2)
  }
 
but this test is checking only sql plan for aggregate query, and I will 
search other ways for check via unit tests.
@StephanEwen, @fhueske, what is your oppinion on this case?


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689878#comment-15689878
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2840
  
It is possible to check this via Unit tests, rather than ITCases?


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689481#comment-15689481
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user ex00 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r89277461
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -157,4 +161,41 @@ class DataSetAggregate(
   case _ => result
 }
   }
+
+  /**
+* Dummy [[Row]] into a [[DataSet]] for result after map operations.
+* @param mapOperator after which insert dummy records
+* @param tableEnv [[BatchTableEnvironment]] for getting rel builder 
and type factory
+* @tparam IN mapOperator input type
+* @tparam OUT mapOperator output type
+* @return DataSet of type Row is contains null literals for columns
+*/
+  private def dummyRow[IN,OUT](
+  mapOperator: MapOperator[IN,OUT],
+  tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+val builder: RelDataTypeFactory.FieldInfoBuilder = 
getCluster.getTypeFactory.builder
+val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo]
+
+val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] =
+  ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+for (fieldName <- rowInfo.getFieldNames)
+  yield {
+val columnType = tableEnv.getTypeFactory
+  .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName))
+builder.add(fieldName, columnType)
+tableEnv.getRelBuilder.getRexBuilder
+  .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral]
+  }))
+
+val dataType = builder.build()
+
+val relNode = RelFactories.DEFAULT_VALUES_FACTORY
+  .createValues(getCluster, dataType, nullLiterals)
+
+DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule]
--- End diff --

sorry, I don't understand your point. Could you explain, please?
Do you propose add new `DataSetNullValues extends LogicalAggregate with 
DataSetRel` for opportunity translate this to `DataSet`? and create rule for 
initialize new DataSetNullValues. It is correct?

Could I use ` DataSetValues` directly, maybe?  for example
```scala
val relNode = RelFactories.DEFAULT_VALUES_FACTORY
  .createValues(getCluster, dataType, nullLiterals)

new DataSetValues(
  cluster,
  relNode.getTraitSet.replace(DataSetConvention.INSTANCE),
  dataType,
  nullLiterals,
  "DataSetNullValuesRule"

).translateToPlan(tableEnv,Some(TypeConverter.DEFAULT_ROW_TYPE)).asInstanceOf[DataSet[Row]]
```


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686433#comment-15686433
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user ex00 commented on the issue:

https://github.com/apache/flink/pull/2840
  
Hi @fhueske
>Actually, I think implementing the fix as an optimizer rule would be the 
nicer solution. In that case we could transform one of the ITCases into a test 
that extends the TableTestBase.
What do you think?

I'm think it is good idea. I can create ``AggregationTest extends 
TableTestBase`` where we will verify sql for queries are containing aggregate 
functions


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684617#comment-15684617
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r88978610
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -157,4 +161,41 @@ class DataSetAggregate(
   case _ => result
 }
   }
+
+  /**
+* Dummy [[Row]] into a [[DataSet]] for result after map operations.
+* @param mapOperator after which insert dummy records
+* @param tableEnv [[BatchTableEnvironment]] for getting rel builder 
and type factory
+* @tparam IN mapOperator input type
+* @tparam OUT mapOperator output type
+* @return DataSet of type Row is contains null literals for columns
+*/
+  private def dummyRow[IN,OUT](
+  mapOperator: MapOperator[IN,OUT],
+  tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+val builder: RelDataTypeFactory.FieldInfoBuilder = 
getCluster.getTypeFactory.builder
+val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo]
+
+val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] =
+  ImmutableList.of(ImmutableList.copyOf[RexLiteral](
+for (fieldName <- rowInfo.getFieldNames)
+  yield {
+val columnType = tableEnv.getTypeFactory
+  .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName))
+builder.add(fieldName, columnType)
+tableEnv.getRelBuilder.getRexBuilder
+  .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral]
+  }))
+
+val dataType = builder.build()
+
+val relNode = RelFactories.DEFAULT_VALUES_FACTORY
+  .createValues(getCluster, dataType, nullLiterals)
+
+DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule]
--- End diff --

Can we directly create a `ValuesInputFormat` as in `DataSetValues`. Using 
the `DataSetValuesRule` outside of the optimizer does not seem like a clean 
design. 

Alternatively, we can think about implementing the whole fix as a 
RelOptRule which injects a LogicalValues with a null row in front of a 
LogicalAggregate without groupingSet. In addition we would need to make sure 
that LogicalAggregate is only translated into DataSetAggregate if the 
LogicalValues exists.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684619#comment-15684619
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r88978720
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
 ---
@@ -258,4 +258,42 @@ class AggregationsITCase(
 // must fail. grouping sets are not supported
 tEnv.sql(sqlQuery).toDataSet[Row]
   }
+
+  @Test
+  def testDummyRecords(): Unit = {
--- End diff --

rename method to `testAggregateEmptyDataSets`.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684616#comment-15684616
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r88979147
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
 ---
@@ -400,5 +400,36 @@ class AggregationsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testDummyRecords(): Unit = {
--- End diff --

please rename method


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684618#comment-15684618
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r88979272
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -133,7 +137,7 @@ class DataSetAggregate(
   else {
 // global aggregation
 val aggOpName = s"select:($aggString)"
-mappedInput.asInstanceOf[DataSet[Row]]
+
mappedInput.asInstanceOf[DataSet[Row]].union(dummyRow(mappedInput,tableEnv))
--- End diff --

Please add a space after the comma.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684615#comment-15684615
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2840#discussion_r88977919
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -157,4 +161,41 @@ class DataSetAggregate(
   case _ => result
 }
   }
+
+  /**
+* Dummy [[Row]] into a [[DataSet]] for result after map operations.
+* @param mapOperator after which insert dummy records
+* @param tableEnv [[BatchTableEnvironment]] for getting rel builder 
and type factory
+* @tparam IN mapOperator input type
+* @tparam OUT mapOperator output type
+* @return DataSet of type Row is contains null literals for columns
+*/
+  private def dummyRow[IN,OUT](
--- End diff --

I would not call this "Dummy row" but "Null row". Dummy row sounds like a 
useless mock up, but our null row has a good purpose.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683424#comment-15683424
 ] 

Anton Mushin commented on FLINK-4832:
-

Thanks for your reply. 
done :)


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683420#comment-15683420
 ] 

ASF GitHub Bot commented on FLINK-4832:
---

GitHub user ex00 opened a pull request:

https://github.com/apache/flink/pull/2840

[FLINK-4832] Count/Sum 0 elements

Hello.
Currently, if `AggregateDataSet` is empty then we unable to count or sum up 
0 elements. 
These changes allows to get correct result of aggregate function through 
dummy row union with the  `AggregateDataSet`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ex00/flink FLINK-4832

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2840.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2840


commit 62f5fd6a0f6f808126d079353b1f1d9976ccac35
Author: Anton Mushin 
Date:   2016-11-21T11:49:41Z

[FLINK-4832] Count/Sum 0 elements

aggregateDataSet union with dataSet is contains dummy records for correct 
to calculate aggregate functions if source aggregateDataSet is empty




> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683017#comment-15683017
 ] 

Fabian Hueske commented on FLINK-4832:
--

The  overall approach looks good, IMO. 
Can you open a pull request?

Thanks, Fabian

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-21 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682939#comment-15682939
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello everyone,
I'm update implementation according to changes in FLINK-4263.
Could somebody check 
[changes|https://github.com/apache/flink/compare/master...ex00:FLINK-4832] 
please, is correct idea ?


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-18 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676435#comment-15676435
 ] 

Anton Mushin commented on FLINK-4832:
-

Hi [~twalthr],
I have trouble with next code
{code:java}
@Test
public void testValuesWithCast() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
"(2, cast(2 as BIGINT))," +
"(3, cast(3 as BIGINT))";
Table result = tableEnv.sql(sqlQuery);
DataSet resultSet = tableEnv.toDataSet(result, Row.class);
resultSet.print();
List results = resultSet.collect();
String expected = "1,1\n2,2\n3,3";
compareResultAsText(results, expected);
}
{code}
I'm getting next output:
{noformat}
1,1
2,2

java.lang.AssertionError: Wrong number of elements result 
Expected :3
Actual   :2
{noformat}
I should not use {{cast}} operator in {{VALUES}} or is it bug? 

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-15 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667131#comment-15667131
 ] 

Anton Mushin commented on FLINK-4832:
-

Ok.
Could you check 
[commit|https://github.com/ex00/flink/commit/c93071585ebb21453b22c9c9d102964af06bf45a],
 is correct idea for implementation this issue?

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15666782#comment-15666782
 ] 

Timo Walther commented on FLINK-4832:
-

We should first fix FLINK-4263 before we implement this issue. I will assign it 
to me.

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652028#comment-15652028
 ] 

Fabian Hueske commented on FLINK-4832:
--

It is true that Flink does not support {{null}} fields in record types, i.e., 
Flink's Java Tuples and Scala's Tuples.
However, the {{Row}} type used for aggregations does support {{null}} fields. 

Have a look at how the {{DataSetValues}} class creates a {{DataSet}} for a 
given set of row values.
This technique and the {{ValuesInputFormat}} can be used to create a DataSet 
with a single row with all fields being {{null}}.

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-08 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650089#comment-15650089
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello [~fhueske]
bq. We need to inject a Union before the Map with static data set with a single 
record, that contains only null values for each field.
How can I create dataset with null values for each field? How I understood 
Flink not support empty dataSet or null values records.
Code is for example:
{code}
val ds = env.fromElements(
  (1: Byte, 1: Short),
  (2: Byte, 2: Short),
  (null,null))
  .toTable(tEnv, 'a, 'b)  // ==> Caused by: 
org.apache.flink.api.table.TableException: Unsupported data type encountered: 
ANY
//file is empty
env.readCsvFile[(Byte, Short)](file, includedFields = Array(0, 1)).collect() // 
==> org.apache.flink.api.common.io.ParseException: Row too short:
{code}
Do need in the this issue add support empty DataSet?

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-01 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626549#comment-15626549
 ] 

Fabian Hueske commented on FLINK-4832:
--

Currently, a {{DataSetAggregate}} is translated into a {{MapFunction}} followed 
by a {{GroupReduceFunction}}.
The {{MapFunction}} initializes the aggregates and the {{GroupReduceFunction}} 
computes the aggregates.

I think we need to do the following. We need to inject a {{Union}} before the 
Map with static data set with a single record, that contains only null values 
for each field.
The {{CountAggregate}} initializes to {{0}} (which is what we want), all other 
functions initialize to {{null}} and ignore the record.


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-11-01 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625448#comment-15625448
 ] 

Anton Mushin commented on FLINK-4832:
-

Hi everyone.
I want to implement this issue and I have some questions:
1. Is it necessary add to dataSet new method for check empty set?
2. What is it should be a dummy record? is it empty list or what?


> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15605349#comment-15605349
 ] 

Aljoscha Krettek commented on FLINK-4832:
-

+1 That's exactly what I said earlier ... ;-)

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15602184#comment-15602184
 ] 

Fabian Hueske commented on FLINK-4832:
--

[~jark] is right. A zero count is only necessary for non-grouped aggregates. We 
should check the semantics of the remaining aggregations functions for this 
case as well.

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-24 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15602113#comment-15602113
 ] 

Jark Wu commented on FLINK-4832:


I have tried to count elements in an empty table in MySQL. Without GROUP BY, 
the result is zero. With a GROUP BY, the result is empty. So maybe union a 
dummy record can work ? 

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601955#comment-15601955
 ] 

Aljoscha Krettek commented on FLINK-4832:
-

Counting {{0}} elements using a union with a dummy would only be possible on a 
global aggregation, not on a keyed aggregation. For the keyed aggregation you 
would have to insert a dummy for every key but what is "every key".

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)