[ https://issues.apache.org/jira/browse/BEAM-5384?focusedWorklogId=158394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158394 ]
ASF GitHub Bot logged work on BEAM-5384: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Oct/18 23:34 Start Date: 24/Oct/18 23:34 Worklog Time Spent: 10m Work Description: akedin closed pull request #6816: [BEAM-5384][SQL] Support aggregation without projection URL: https://github.com/apache/beam/pull/6816 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java index 24d72c1fd67..d7bd179d236 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule; +import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule; @@ -135,6 +136,7 @@ ImmutableList.of( BeamCalcRule.INSTANCE, BeamAggregationRule.INSTANCE, + BeamBasicAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE, BeamIntersectRule.INSTANCE, diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 68a9f537b2e..7a7e81dc363 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.extensions.sql.impl.transform.MultipleAggregationsFn; import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; @@ -63,7 +64,7 @@ /** {@link BeamRelNode} to replace a {@link Aggregate} node. */ public class BeamAggregationRel extends Aggregate implements BeamRelNode { - private WindowFn<Row, IntervalWindow> windowFn; + private @Nullable WindowFn<Row, IntervalWindow> windowFn; private final int windowFieldIndex; public BeamAggregationRel( @@ -74,7 +75,7 @@ public BeamAggregationRel( ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, - WindowFn<Row, IntervalWindow> windowFn, + @Nullable WindowFn<Row, IntervalWindow> windowFn, int windowFieldIndex) { super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java new file mode 100644 index 00000000000..eb939112bc0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rule; + +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.tools.RelBuilderFactory; + +/** + * Aggregation rule that doesn't include projection. + * + * <p>Doesn't support windowing, as we extract window information from projection node. + * + * <p>{@link BeamAggregationRule} supports projection and windowing. + */ +public class BeamBasicAggregationRule extends RelOptRule { + public static final BeamBasicAggregationRule INSTANCE = + new BeamBasicAggregationRule(Aggregate.class, RelFactories.LOGICAL_BUILDER); + + public BeamBasicAggregationRule( + Class<? extends Aggregate> aggregateClass, RelBuilderFactory relBuilderFactory) { + super(operand(aggregateClass, operand(TableScan.class, any())), relBuilderFactory, null); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Aggregate aggregate = call.rel(0); + TableScan tableScan = call.rel(1); + + RelNode newTableScan = tableScan.copy(tableScan.getTraitSet(), tableScan.getInputs()); + + call.transformTo( + new BeamAggregationRel( + aggregate.getCluster(), + aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert( + newTableScan, newTableScan.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList(), + null, + -1)); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java index 9f7fb91b697..775c16c8a63 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java @@ -44,7 +44,7 @@ */ static ArgsAdapter of(List<Integer> argList, Schema inputSchema) { if (argList.size() == 0) { - return new ZeroArgsAdapter(inputSchema); + return ZeroArgsAdapter.INSTANCE; } else if (argList.size() == 1) { return new SingleArgAdapter(inputSchema, argList); } else { @@ -53,29 +53,32 @@ static ArgsAdapter of(List<Integer> argList, Schema inputSchema) { } /** - * If SQL aggregation call doesn't have actual arguments, we still pass the value first field to - * the delegate {@link CombineFn}. + * If SQL aggregation call doesn't have actual arguments, we pass an empty row to it. + * + * <p>This is for cases like COUNT(1) which doesn't take any arguments, or COUNT(*) that is a + * special case that returns the same result. In both of these cases we should count all Rows no + * matter whether they have NULLs or not. * - * <p>Note: this is not necessarily the correct thing to do. + * <p>This is a special case of the MultiArgsAdapter below. */ static class ZeroArgsAdapter implements ArgsAdapter { - Schema sourceSchema; + private static final Schema EMPTY_SCHEMA = Schema.builder().build(); + private static final Row EMPTY_ROW = Row.withSchema(EMPTY_SCHEMA).build(); + private static final Coder<Row> EMPTY_ROW_CODER = SchemaCoder.of(EMPTY_SCHEMA); - ZeroArgsAdapter(Schema sourceSchema) { - this.sourceSchema = sourceSchema; - } + static final ZeroArgsAdapter INSTANCE = new ZeroArgsAdapter(); /** Extracts the value from the first field of a row. */ @Nullable @Override public Object getArgsValues(Row input) { - return input.getValue(0); + return EMPTY_ROW; } /** Coder for the first field of a row. */ @Override public Coder getArgsValuesCoder() { - return RowCoder.coderForFieldType(sourceSchema.getField(0).getType()); + return EMPTY_ROW_CODER; } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 95ec4cc97aa..64eff9beaf3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -612,6 +612,37 @@ public void testSupportsGlobalWindowWithCustomTrigger() throws Exception { pipeline.run(); } + /** Query has all the input fields, so no projection is added. */ + @Test + public void testSupportsAggregationWithoutProjection() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + + Schema schema = + Schema.builder().addInt32Field("f_intGroupingKey").addInt32Field("f_intValue").build(); + + PCollection<Row> inputRows = + pipeline + .apply( + Create.of( + TestUtils.rowsBuilderOf(schema) + .addRows( + 0, 1, + 0, 2, + 1, 3, + 2, 4, + 2, 5) + .getRows())) + .setSchema(schema, SerializableFunctions.identity(), SerializableFunctions.identity()); + + String sql = "SELECT SUM(f_intValue) FROM PCOLLECTION GROUP BY f_intGroupingKey"; + + PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); + + PAssert.that(result).containsInAnyOrder(rowsWithSingleIntField("sum", Arrays.asList(3, 3, 9))); + + pipeline.run(); + } + @Test public void testSupportsNonGlobalWindowWithCustomTrigger() { DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 158394) Time Spent: 1.5h (was: 1h 20m) > [SQL] Calcite optimizes away LogicalProject > ------------------------------------------- > > Key: BEAM-5384 > URL: https://issues.apache.org/jira/browse/BEAM-5384 > Project: Beam > Issue Type: Bug > Components: dsl-sql > Reporter: Anton Kedin > Assignee: Anton Kedin > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > *From > [https://stackoverflow.com/questions/52313324/beam-sql-wont-work-when-using-aggregation-in-statement-cannot-plan-execution] > :* > I have a basic Beam pipeline that reads from GCS, does a Beam SQL transform > and writes the results to BigQuery. > When I don't do any aggregation in my SQL statement it works fine: > {code:java} > .. > PCollection<Row> outputStream = > sqlRows.apply( > "sql_transform", > SqlTransform.query("select views from PCOLLECTION")); > outputStream.setCoder(SCHEMA.getRowCoder()); > .. > {code} > However, when I try to aggregate with a sum then it fails (throws a > CannotPlanException exception): > {code:java} > .. > PCollection<Row> outputStream = > sqlRows.apply( > "sql_transform", > SqlTransform.query("select wikimedia_project, > sum(views) from PCOLLECTION group by wikimedia_project")); > outputStream.setCoder(SCHEMA.getRowCoder()); > .. > {code} > Stacktrace: > {code:java} > Step #1: 11:47:37,562 0 [main] INFO > org.apache.beam.runners.dataflow.DataflowRunner - > PipelineOptions.filesToStage was not specified. Defaulting to files from the > classpath: will stage 117 files. Enable logging at DEBUG level to see which > files will be staged. > Step #1: 11:47:39,845 2283 [main] INFO > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL: > Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`) > Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION` > Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project` > Step #1: 11:47:40,387 2825 [main] INFO > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan> > Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) > Step #1: BeamIOSourceRel(table=[[beam, PCOLLECTION]]) > Step #1: > Step #1: Exception in thread "main" > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner > state: > Step #1: > Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[] > Step #1: Original rel: > Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], > group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = > {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5 > Step #1: BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], > table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 > rows, 101.0 cpu, 0.0 io}, id = 2 > Step #1: > Step #1: Sets: > Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views) > Step #1: rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81 > Step #1: rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > Step #1: rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405 > Step #1: > rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1) > Step #1: rel#6:Subset#1.NONE.[], best=null, importance=0.9 > Step #1: > rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)), > rowcount=10.0, cumulative cost={inf} > Step #1: rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0 > Step #1: > rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]), > rowcount=10.0, cumulative cost={inf} > Step #1: > Step #1: > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336) > Step #1: at > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138) > Step #1: at > org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105) > Step #1: at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96) > Step #1: at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79) > Step #1: at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > Step #1: at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) > Step #1: at > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338) > Step #1: at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59) > Step #1: :run FAILED > Step #1: > Step #1: FAILURE: Build failed with an exception. > {code} > I'm using Beam 2.6.0 > Am I missing something obvious? -- This message was sent by Atlassian JIRA (v7.6.3#76005)