[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15705147#comment-15705147 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2840 Merging > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15705051#comment-15705051 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2840 Thanks for the update @ex00! PR is good to merge! > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704779#comment-15704779 ] ASF GitHub Bot commented on FLINK-4832: --- Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2840 Hi @fhueske, thanks for your review and comments! I updated PR. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702284#comment-15702284 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r89806470 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan._ +import scala.collection.JavaConversions._ +import com.google.common.collect.ImmutableList +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate} +import org.apache.calcite.rex.RexLiteral +import org.apache.flink.api.table._ +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} + +/** + * Rule for insert [[Row]] with null records into a [[DataSetAggregate]] + * Rule apply for non grouped aggregate query + */ +class DataSetAggregateWithNullValuesRule + extends ConverterRule( +classOf[LogicalAggregate], +Convention.NONE, +DataSetConvention.INSTANCE, +"DataSetAggregateWithNullValuesRule") +{ + + override def matches(call: RelOptRuleCall): Boolean = { +val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] + +//for grouped agg sets shouldn't attach of null row +//need apply other rules. e.g. [[DataSetAggregateRule]] +if (!agg.getGroupSet.isEmpty) { + return false +} + +// TODO code duplicates DataSetAggregateRule#matches +// check if we have distinct aggregates +val distinctAggs = agg.getAggCallList.exists(_.isDistinct) +if (distinctAggs) { + throw TableException("DISTINCT aggregates are currently not supported.") +} + +// check if we have grouping sets +val groupSets = agg.getGroupSets.size() == 0 || agg.getGroupSets.get(0) != agg.getGroupSet +if (groupSets || agg.indicator) { + throw TableException("GROUPING SETS are currently not supported.") +} +!distinctAggs && !groupSets && !agg.indicator + } + + override def convert(rel: RelNode): RelNode = { +val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] +val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) +val cluster: RelOptCluster = rel.getCluster + +val fieldTypes = agg.getInput.getRowType.getFieldList.map(_.getType) +val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] = + ImmutableList.of(ImmutableList.copyOf[RexLiteral]( +for (fieldType <- fieldTypes) + yield { +cluster.getRexBuilder. + makeLiteral(null, fieldType, false).asInstanceOf[RexLiteral] + })) + +val logicalValues = LogicalValues.create(cluster, agg.getInput.getRowType, nullLiterals) +val logicalUnion = LogicalUnion.create(List(logicalValues, agg.getInput), true) +val logicalAggregate = new LogicalAggregate( --- End diff -- I think we do not need the `LogicalAggregate` but can directly create the `DataSetAggregate` > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702287#comment-15702287 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r89808397 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.utils.TableTestBase +import org.apache.flink.api.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Test for testing aggregate plans. + */ +class AggregationTest extends TableTestBase { + + @Test + def testAggregateQueryBatchSQL(): Unit = { +val util = batchTestUtil() +util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + +val sqlQuery = ( + "SELECT avg(a),sum(b),count(c) FROM MyTable", --- End diff -- I would separate the test into two methods. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702285#comment-15702285 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r89808446 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.utils.TableTestBase +import org.apache.flink.api.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Test for testing aggregate plans. + */ +class AggregationTest extends TableTestBase { + + @Test + def testAggregateQueryBatchSQL(): Unit = { +val util = batchTestUtil() +util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + +val sqlQuery = ( + "SELECT avg(a),sum(b),count(c) FROM MyTable", + "SELECT avg(a),sum(b),count(c) FROM MyTable WHERE a = 1" +) + +val calcNode = unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", "=(a, 1)") +) + +val setValues = ( + unaryNode( +"DataSetValues", +batchTableNode(0), +tuples(List(null,null,null)), +term("values","a","b","c") + ), + unaryNode( +"DataSetValues", +calcNode, +tuples(List(null,null,null)), +term("values","a","b","c") + ) +) +val union = ( + unaryNode( + "DataSetUnion", + setValues._1, + term("union","a","b","c") +), + unaryNode( + "DataSetUnion", + setValues._2, + term("union","a","b","c") + ) +) + +val aggregate = ( + unaryNode( +"DataSetAggregate", +union._1, +term("select", + "AVG(a) AS EXPR$0", + "SUM(b) AS EXPR$1", + "COUNT(c) AS EXPR$2") + ), + unaryNode( +"DataSetAggregate", +union._2, +term("select", + "AVG(a) AS EXPR$0", + "SUM(b) AS EXPR$1", + "COUNT(c) AS EXPR$2") + ) +) +util.verifySql(sqlQuery._1, aggregate._1) +util.verifySql(sqlQuery._2, aggregate._2) + } + + @Test + def testAggregateGroupQueryBatchSQL(): Unit = { +val util = batchTestUtil() +util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + +val sqlQuery = ( + "SELECT avg(a),sum(b),count(c) FROM MyTable GROUP BY a", --- End diff -- I would separate the test into two methods. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702286#comment-15702286 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r89809360 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala --- @@ -400,5 +400,37 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testAggregateEmptyDataSets(): Unit = { --- End diff -- Table API and SQL are internally represented the same way. We can drop the expensive ITCase for the Table API and add tests that check the plan based on `TableTestBase` for the Table API instead. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15702148#comment-15702148 ] ASF GitHub Bot commented on FLINK-4832: --- Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2840 Hi I updated PR, added new rule for non grouped aggregate data and added `AggregationTest` for check plan of query. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690049#comment-15690049 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2840 If we fix this issue with a `RelOptRule` we can do all tests with the `TableTestBase` because the `DataSetValues` with the emptry record will show up in the plan. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690003#comment-15690003 ] ASF GitHub Bot commented on FLINK-4832: --- Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2840 Hi @StephanEwen I found a way check these changes via `TableTestBase` how told @fhueske ```scala @Test def testAggregateQueryBatchSQL(): Unit = { val util = batchTestUtil() util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) val sqlQuery = ( "SELECT avg(a),sum(b),count(c) FROM MyTable", "SELECT avg(a),sum(b),count(c) FROM MyTable WHERE a = 1" ) val calcNode = unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "b", "c"), term("where", "=(a, 1)") ) val aggregate = ( unaryNode( "DataSetAggregate", batchTableNode(0), term("select", "AVG(a) AS EXPR$0", "SUM(b) AS EXPR$1", "COUNT(c) AS EXPR$2") ), unaryNode( "DataSetAggregate", calcNode, term("select", "AVG(a) AS EXPR$0", "SUM(b) AS EXPR$1", "COUNT(c) AS EXPR$2") ) ) util.verifySql(sqlQuery._1, aggregate._1) util.verifySql(sqlQuery._2, aggregate._2) } but this test is checking only sql plan for aggregate query, and I will search other ways for check via unit tests. @StephanEwen, @fhueske, what is your oppinion on this case? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689878#comment-15689878 ] ASF GitHub Bot commented on FLINK-4832: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2840 It is possible to check this via Unit tests, rather than ITCases? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15689481#comment-15689481 ] ASF GitHub Bot commented on FLINK-4832: --- Github user ex00 commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r89277461 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -157,4 +161,41 @@ class DataSetAggregate( case _ => result } } + + /** +* Dummy [[Row]] into a [[DataSet]] for result after map operations. +* @param mapOperator after which insert dummy records +* @param tableEnv [[BatchTableEnvironment]] for getting rel builder and type factory +* @tparam IN mapOperator input type +* @tparam OUT mapOperator output type +* @return DataSet of type Row is contains null literals for columns +*/ + private def dummyRow[IN,OUT]( + mapOperator: MapOperator[IN,OUT], + tableEnv: BatchTableEnvironment): DataSet[Row] = { + +val builder: RelDataTypeFactory.FieldInfoBuilder = getCluster.getTypeFactory.builder +val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo] + +val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] = + ImmutableList.of(ImmutableList.copyOf[RexLiteral]( +for (fieldName <- rowInfo.getFieldNames) + yield { +val columnType = tableEnv.getTypeFactory + .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName)) +builder.add(fieldName, columnType) +tableEnv.getRelBuilder.getRexBuilder + .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral] + })) + +val dataType = builder.build() + +val relNode = RelFactories.DEFAULT_VALUES_FACTORY + .createValues(getCluster, dataType, nullLiterals) + +DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule] --- End diff -- sorry, I don't understand your point. Could you explain, please? Do you propose add new `DataSetNullValues extends LogicalAggregate with DataSetRel` for opportunity translate this to `DataSet`? and create rule for initialize new DataSetNullValues. It is correct? Could I use ` DataSetValues` directly, maybe? for example ```scala val relNode = RelFactories.DEFAULT_VALUES_FACTORY .createValues(getCluster, dataType, nullLiterals) new DataSetValues( cluster, relNode.getTraitSet.replace(DataSetConvention.INSTANCE), dataType, nullLiterals, "DataSetNullValuesRule" ).translateToPlan(tableEnv,Some(TypeConverter.DEFAULT_ROW_TYPE)).asInstanceOf[DataSet[Row]] ``` > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686433#comment-15686433 ] ASF GitHub Bot commented on FLINK-4832: --- Github user ex00 commented on the issue: https://github.com/apache/flink/pull/2840 Hi @fhueske >Actually, I think implementing the fix as an optimizer rule would be the nicer solution. In that case we could transform one of the ITCases into a test that extends the TableTestBase. What do you think? I'm think it is good idea. I can create ``AggregationTest extends TableTestBase`` where we will verify sql for queries are containing aggregate functions > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684617#comment-15684617 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88978610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -157,4 +161,41 @@ class DataSetAggregate( case _ => result } } + + /** +* Dummy [[Row]] into a [[DataSet]] for result after map operations. +* @param mapOperator after which insert dummy records +* @param tableEnv [[BatchTableEnvironment]] for getting rel builder and type factory +* @tparam IN mapOperator input type +* @tparam OUT mapOperator output type +* @return DataSet of type Row is contains null literals for columns +*/ + private def dummyRow[IN,OUT]( + mapOperator: MapOperator[IN,OUT], + tableEnv: BatchTableEnvironment): DataSet[Row] = { + +val builder: RelDataTypeFactory.FieldInfoBuilder = getCluster.getTypeFactory.builder +val rowInfo = mapOperator.getResultType.asInstanceOf[RowTypeInfo] + +val nullLiterals :ImmutableList[ImmutableList[RexLiteral]] = + ImmutableList.of(ImmutableList.copyOf[RexLiteral]( +for (fieldName <- rowInfo.getFieldNames) + yield { +val columnType = tableEnv.getTypeFactory + .createTypeFromTypeInfo(rowInfo.getTypeAt(fieldName)) +builder.add(fieldName, columnType) +tableEnv.getRelBuilder.getRexBuilder + .makeLiteral(null,columnType,false).asInstanceOf[RexLiteral] + })) + +val dataType = builder.build() + +val relNode = RelFactories.DEFAULT_VALUES_FACTORY + .createValues(getCluster, dataType, nullLiterals) + +DataSetValuesRule.INSTANCE.asInstanceOf[DataSetValuesRule] --- End diff -- Can we directly create a `ValuesInputFormat` as in `DataSetValues`. Using the `DataSetValuesRule` outside of the optimizer does not seem like a clean design. Alternatively, we can think about implementing the whole fix as a RelOptRule which injects a LogicalValues with a null row in front of a LogicalAggregate without groupingSet. In addition we would need to make sure that LogicalAggregate is only translated into DataSetAggregate if the LogicalValues exists. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684619#comment-15684619 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88978720 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala --- @@ -258,4 +258,42 @@ class AggregationsITCase( // must fail. grouping sets are not supported tEnv.sql(sqlQuery).toDataSet[Row] } + + @Test + def testDummyRecords(): Unit = { --- End diff -- rename method to `testAggregateEmptyDataSets`. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684616#comment-15684616 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88979147 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala --- @@ -400,5 +400,36 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testDummyRecords(): Unit = { --- End diff -- please rename method > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684618#comment-15684618 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88979272 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -133,7 +137,7 @@ class DataSetAggregate( else { // global aggregation val aggOpName = s"select:($aggString)" -mappedInput.asInstanceOf[DataSet[Row]] + mappedInput.asInstanceOf[DataSet[Row]].union(dummyRow(mappedInput,tableEnv)) --- End diff -- Please add a space after the comma. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15684615#comment-15684615 ] ASF GitHub Bot commented on FLINK-4832: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2840#discussion_r88977919 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -157,4 +161,41 @@ class DataSetAggregate( case _ => result } } + + /** +* Dummy [[Row]] into a [[DataSet]] for result after map operations. +* @param mapOperator after which insert dummy records +* @param tableEnv [[BatchTableEnvironment]] for getting rel builder and type factory +* @tparam IN mapOperator input type +* @tparam OUT mapOperator output type +* @return DataSet of type Row is contains null literals for columns +*/ + private def dummyRow[IN,OUT]( --- End diff -- I would not call this "Dummy row" but "Null row". Dummy row sounds like a useless mock up, but our null row has a good purpose. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683424#comment-15683424 ] Anton Mushin commented on FLINK-4832: - Thanks for your reply. done :) > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683420#comment-15683420 ] ASF GitHub Bot commented on FLINK-4832: --- GitHub user ex00 opened a pull request: https://github.com/apache/flink/pull/2840 [FLINK-4832] Count/Sum 0 elements Hello. Currently, if `AggregateDataSet` is empty then we unable to count or sum up 0 elements. These changes allows to get correct result of aggregate function through dummy row union with the `AggregateDataSet`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ex00/flink FLINK-4832 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2840 commit 62f5fd6a0f6f808126d079353b1f1d9976ccac35 Author: Anton MushinDate: 2016-11-21T11:49:41Z [FLINK-4832] Count/Sum 0 elements aggregateDataSet union with dataSet is contains dummy records for correct to calculate aggregate functions if source aggregateDataSet is empty > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683017#comment-15683017 ] Fabian Hueske commented on FLINK-4832: -- The overall approach looks good, IMO. Can you open a pull request? Thanks, Fabian > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682939#comment-15682939 ] Anton Mushin commented on FLINK-4832: - Hello everyone, I'm update implementation according to changes in FLINK-4263. Could somebody check [changes|https://github.com/apache/flink/compare/master...ex00:FLINK-4832] please, is correct idea ? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676435#comment-15676435 ] Anton Mushin commented on FLINK-4832: - Hi [~twalthr], I have trouble with next code {code:java} @Test public void testValuesWithCast() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," + "(2, cast(2 as BIGINT))," + "(3, cast(3 as BIGINT))"; Table result = tableEnv.sql(sqlQuery); DataSet resultSet = tableEnv.toDataSet(result, Row.class); resultSet.print(); List results = resultSet.collect(); String expected = "1,1\n2,2\n3,3"; compareResultAsText(results, expected); } {code} I'm getting next output: {noformat} 1,1 2,2 java.lang.AssertionError: Wrong number of elements result Expected :3 Actual :2 {noformat} I should not use {{cast}} operator in {{VALUES}} or is it bug? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667131#comment-15667131 ] Anton Mushin commented on FLINK-4832: - Ok. Could you check [commit|https://github.com/ex00/flink/commit/c93071585ebb21453b22c9c9d102964af06bf45a], is correct idea for implementation this issue? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15666782#comment-15666782 ] Timo Walther commented on FLINK-4832: - We should first fix FLINK-4263 before we implement this issue. I will assign it to me. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652028#comment-15652028 ] Fabian Hueske commented on FLINK-4832: -- It is true that Flink does not support {{null}} fields in record types, i.e., Flink's Java Tuples and Scala's Tuples. However, the {{Row}} type used for aggregations does support {{null}} fields. Have a look at how the {{DataSetValues}} class creates a {{DataSet}} for a given set of row values. This technique and the {{ValuesInputFormat}} can be used to create a DataSet with a single row with all fields being {{null}}. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650089#comment-15650089 ] Anton Mushin commented on FLINK-4832: - Hello [~fhueske] bq. We need to inject a Union before the Map with static data set with a single record, that contains only null values for each field. How can I create dataset with null values for each field? How I understood Flink not support empty dataSet or null values records. Code is for example: {code} val ds = env.fromElements( (1: Byte, 1: Short), (2: Byte, 2: Short), (null,null)) .toTable(tEnv, 'a, 'b) // ==> Caused by: org.apache.flink.api.table.TableException: Unsupported data type encountered: ANY //file is empty env.readCsvFile[(Byte, Short)](file, includedFields = Array(0, 1)).collect() // ==> org.apache.flink.api.common.io.ParseException: Row too short: {code} Do need in the this issue add support empty DataSet? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Anton Mushin > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626549#comment-15626549 ] Fabian Hueske commented on FLINK-4832: -- Currently, a {{DataSetAggregate}} is translated into a {{MapFunction}} followed by a {{GroupReduceFunction}}. The {{MapFunction}} initializes the aggregates and the {{GroupReduceFunction}} computes the aggregates. I think we need to do the following. We need to inject a {{Union}} before the Map with static data set with a single record, that contains only null values for each field. The {{CountAggregate}} initializes to {{0}} (which is what we want), all other functions initialize to {{null}} and ignore the record. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15625448#comment-15625448 ] Anton Mushin commented on FLINK-4832: - Hi everyone. I want to implement this issue and I have some questions: 1. Is it necessary add to dataSet new method for check empty set? 2. What is it should be a dummy record? is it empty list or what? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15605349#comment-15605349 ] Aljoscha Krettek commented on FLINK-4832: - +1 That's exactly what I said earlier ... ;-) > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15602184#comment-15602184 ] Fabian Hueske commented on FLINK-4832: -- [~jark] is right. A zero count is only necessary for non-grouped aggregates. We should check the semantics of the remaining aggregations functions for this case as well. > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15602113#comment-15602113 ] Jark Wu commented on FLINK-4832: I have tried to count elements in an empty table in MySQL. Without GROUP BY, the result is zero. With a GROUP BY, the result is empty. So maybe union a dummy record can work ? > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601955#comment-15601955 ] Aljoscha Krettek commented on FLINK-4832: - Counting {{0}} elements using a union with a dummy would only be possible on a global aggregation, not on a keyed aggregation. For the keyed aggregation you would have to insert a dummy for every key but what is "every key". > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4832) Count/Sum 0 elements
[ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955 ] Anton Mushin commented on FLINK-4832: - Hello I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}} also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will be called if elements are in inputData. {code} TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig); TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig); for (IN element : inputData) { IN inCopy = inSerializer.copy(element); OUT out = function.map(inCopy); result.add(outSerializer.copy(out)); } {code} And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited for examle as {code} override def initiate(partial: Row): Unit = { partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends SumAggregate[T] } {code} then next test will be passed {code} @Test def testDataSetAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1) FROM MyTable" val ds = CollectionDataSets.get3TupleDataSet(env) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "231" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test def testSumNullElements(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " + "FROM (select * from MyTable where _1 = 4)" val ds = env.fromElements( (1: Byte, 2l,1D,1f,1,1:Short ), (2: Byte, 2l,1D,1f,1,1:Short )) tEnv.registerDataSet("MyTable", ds) val result = tEnv.sql(sqlQuery) val expected = "0,0,0.0,0.0,0,0" val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} > Count/Sum 0 elements > > > Key: FLINK-4832 > URL: https://issues.apache.org/jira/browse/FLINK-4832 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, the Table API is unable to count or sum up 0 elements. We should > improve DataSet aggregations for this. Maybe by union the original DataSet > with a dummy record or by using a MapPartition function. Coming up with a > good design for this is also part of this issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)