[ 
https://issues.apache.org/jira/browse/BEAM-5384?focusedWorklogId=158394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158394
 ]

ASF GitHub Bot logged work on BEAM-5384:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Oct/18 23:34
            Start Date: 24/Oct/18 23:34
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #6816: [BEAM-5384][SQL] 
Support aggregation without projection
URL: https://github.com/apache/beam/pull/6816
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 24d72c1fd67..d7bd179d236 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -22,6 +22,7 @@
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
 import 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
 import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
@@ -135,6 +136,7 @@
       ImmutableList.of(
           BeamCalcRule.INSTANCE,
           BeamAggregationRule.INSTANCE,
+          BeamBasicAggregationRule.INSTANCE,
           BeamSortRule.INSTANCE,
           BeamValuesRule.INSTANCE,
           BeamIntersectRule.INSTANCE,
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 68a9f537b2e..7a7e81dc363 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -25,6 +25,7 @@
 
 import com.google.common.collect.Lists;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.KvCoder;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.MultipleAggregationsFn;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
@@ -63,7 +64,7 @@
 
 /** {@link BeamRelNode} to replace a {@link Aggregate} node. */
 public class BeamAggregationRel extends Aggregate implements BeamRelNode {
-  private WindowFn<Row, IntervalWindow> windowFn;
+  private @Nullable WindowFn<Row, IntervalWindow> windowFn;
   private final int windowFieldIndex;
 
   public BeamAggregationRel(
@@ -74,7 +75,7 @@ public BeamAggregationRel(
       ImmutableBitSet groupSet,
       List<ImmutableBitSet> groupSets,
       List<AggregateCall> aggCalls,
-      WindowFn<Row, IntervalWindow> windowFn,
+      @Nullable WindowFn<Row, IntervalWindow> windowFn,
       int windowFieldIndex) {
 
     super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
new file mode 100644
index 00000000000..eb939112bc0
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+/**
+ * Aggregation rule that doesn't include projection.
+ *
+ * <p>Doesn't support windowing, as we extract window information from 
projection node.
+ *
+ * <p>{@link BeamAggregationRule} supports projection and windowing.
+ */
+public class BeamBasicAggregationRule extends RelOptRule {
+  public static final BeamBasicAggregationRule INSTANCE =
+      new BeamBasicAggregationRule(Aggregate.class, 
RelFactories.LOGICAL_BUILDER);
+
+  public BeamBasicAggregationRule(
+      Class<? extends Aggregate> aggregateClass, RelBuilderFactory 
relBuilderFactory) {
+    super(operand(aggregateClass, operand(TableScan.class, any())), 
relBuilderFactory, null);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Aggregate aggregate = call.rel(0);
+    TableScan tableScan = call.rel(1);
+
+    RelNode newTableScan = tableScan.copy(tableScan.getTraitSet(), 
tableScan.getInputs());
+
+    call.transformTo(
+        new BeamAggregationRel(
+            aggregate.getCluster(),
+            aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+            convert(
+                newTableScan, 
newTableScan.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+            aggregate.indicator,
+            aggregate.getGroupSet(),
+            aggregate.getGroupSets(),
+            aggregate.getAggCallList(),
+            null,
+            -1));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java
index 9f7fb91b697..775c16c8a63 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationArgsAdapter.java
@@ -44,7 +44,7 @@
    */
   static ArgsAdapter of(List<Integer> argList, Schema inputSchema) {
     if (argList.size() == 0) {
-      return new ZeroArgsAdapter(inputSchema);
+      return ZeroArgsAdapter.INSTANCE;
     } else if (argList.size() == 1) {
       return new SingleArgAdapter(inputSchema, argList);
     } else {
@@ -53,29 +53,32 @@ static ArgsAdapter of(List<Integer> argList, Schema 
inputSchema) {
   }
 
   /**
-   * If SQL aggregation call doesn't have actual arguments, we still pass the 
value first field to
-   * the delegate {@link CombineFn}.
+   * If SQL aggregation call doesn't have actual arguments, we pass an empty 
row to it.
+   *
+   * <p>This is for cases like COUNT(1) which doesn't take any arguments, or 
COUNT(*) that is a
+   * special case that returns the same result. In both of these cases we 
should count all Rows no
+   * matter whether they have NULLs or not.
    *
-   * <p>Note: this is not necessarily the correct thing to do.
+   * <p>This is a special case of the MultiArgsAdapter below.
    */
   static class ZeroArgsAdapter implements ArgsAdapter {
-    Schema sourceSchema;
+    private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+    private static final Row EMPTY_ROW = Row.withSchema(EMPTY_SCHEMA).build();
+    private static final Coder<Row> EMPTY_ROW_CODER = 
SchemaCoder.of(EMPTY_SCHEMA);
 
-    ZeroArgsAdapter(Schema sourceSchema) {
-      this.sourceSchema = sourceSchema;
-    }
+    static final ZeroArgsAdapter INSTANCE = new ZeroArgsAdapter();
 
     /** Extracts the value from the first field of a row. */
     @Nullable
     @Override
     public Object getArgsValues(Row input) {
-      return input.getValue(0);
+      return EMPTY_ROW;
     }
 
     /** Coder for the first field of a row. */
     @Override
     public Coder getArgsValuesCoder() {
-      return RowCoder.coderForFieldType(sourceSchema.getField(0).getType());
+      return EMPTY_ROW_CODER;
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 95ec4cc97aa..64eff9beaf3 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -612,6 +612,37 @@ public void testSupportsGlobalWindowWithCustomTrigger() 
throws Exception {
     pipeline.run();
   }
 
+  /** Query has all the input fields, so no projection is added. */
+  @Test
+  public void testSupportsAggregationWithoutProjection() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    Schema schema =
+        
Schema.builder().addInt32Field("f_intGroupingKey").addInt32Field("f_intValue").build();
+
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            0, 1,
+                            0, 2,
+                            1, 3,
+                            2, 4,
+                            2, 5)
+                        .getRows()))
+            .setSchema(schema, SerializableFunctions.identity(), 
SerializableFunctions.identity());
+
+    String sql = "SELECT SUM(f_intValue) FROM PCOLLECTION GROUP BY 
f_intGroupingKey";
+
+    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+    PAssert.that(result).containsInAnyOrder(rowsWithSingleIntField("sum", 
Arrays.asList(3, 3, 9)));
+
+    pipeline.run();
+  }
+
   @Test
   public void testSupportsNonGlobalWindowWithCustomTrigger() {
     DateTime startTime = new DateTime(2017, 1, 1, 0, 0, 0, 0);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 158394)
    Time Spent: 1.5h  (was: 1h 20m)

> [SQL] Calcite optimizes away LogicalProject
> -------------------------------------------
>
>                 Key: BEAM-5384
>                 URL: https://issues.apache.org/jira/browse/BEAM-5384
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> *From 
> [https://stackoverflow.com/questions/52313324/beam-sql-wont-work-when-using-aggregation-in-statement-cannot-plan-execution]
>  :*
> I have a basic Beam pipeline that reads from GCS, does a Beam SQL transform 
> and writes the results to BigQuery.
> When I don't do any aggregation in my SQL statement it works fine:
> {code:java}
> ..
> PCollection<Row> outputStream =
>                 sqlRows.apply(
>                         "sql_transform",
>                         SqlTransform.query("select views from PCOLLECTION"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> However, when I try to aggregate with a sum then it fails (throws a 
> CannotPlanException exception):
> {code:java}
> ..
> PCollection<Row> outputStream =
>                 sqlRows.apply(
>                         "sql_transform",
>                         SqlTransform.query("select wikimedia_project, 
> sum(views) from PCOLLECTION group by wikimedia_project"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> Stacktrace:
> {code:java}
> Step #1: 11:47:37,562 0    [main] INFO  
> org.apache.beam.runners.dataflow.DataflowRunner - 
> PipelineOptions.filesToStage was not specified. Defaulting to files from the 
> classpath: will stage 117 files. Enable logging at DEBUG level to see which 
> files will be staged.
> Step #1: 11:47:39,845 2283 [main] INFO  
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL:
> Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`)
> Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
> Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project`
> Step #1: 11:47:40,387 2825 [main] INFO  
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan>
> Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
> Step #1:   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> Step #1: 
> Step #1: Exception in thread "main" 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
>  Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner 
> state:
> Step #1: 
> Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[]
> Step #1: Original rel:
> Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], 
> group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = 
> {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5
> Step #1:   BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], 
> table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 
> rows, 101.0 cpu, 0.0 io}, id = 2
> Step #1: 
> Step #1: Sets:
> Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views)
> Step #1:        rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81
> Step #1:                rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, 
> PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
> Step #1:        rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405
> Step #1:                
> rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]),
>  rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, 
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1)
> Step #1:        rel#6:Subset#1.NONE.[], best=null, importance=0.9
> Step #1:                
> rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)),
>  rowcount=10.0, cumulative cost={inf}
> Step #1:        rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0
> Step #1:                
> rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]),
>  rowcount=10.0, cumulative cost={inf}
> Step #1: 
> Step #1: 
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
> Step #1:        at 
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> Step #1:        at 
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> Step #1:        at 
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
> Step #1:        at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59)
> Step #1: :run FAILED
> Step #1: 
> Step #1: FAILURE: Build failed with an exception.
> {code}
> I'm using Beam 2.6.0
> Am I missing something obvious?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to