[ 
https://issues.apache.org/jira/browse/BEAM-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615043#comment-16615043
 ] 

Anton Kedin commented on BEAM-5384:
-----------------------------------

Reproduction: 
https://github.com/polleyg/gcp-batch-ingestion-bigquery/blob/beam_sql/src/main/java/org/polleyg/TemplatePipeline.java

> [SQL] Calcite Doesn't optimizes away LogicalProject
> ---------------------------------------------------
>
>                 Key: BEAM-5384
>                 URL: https://issues.apache.org/jira/browse/BEAM-5384
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Priority: Major
>
> *From 
> [https://stackoverflow.com/questions/52313324/beam-sql-wont-work-when-using-aggregation-in-statement-cannot-plan-execution]
>  :*
> I have a basic Beam pipeline that reads from GCS, does a Beam SQL transform 
> and writes the results to BigQuery.
> When I don't do any aggregation in my SQL statement it works fine:
> {code:java}
> ..
> PCollection<Row> outputStream =
>                 sqlRows.apply(
>                         "sql_transform",
>                         SqlTransform.query("select views from PCOLLECTION"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> However, when I try to aggregate with a sum then it fails (throws a 
> CannotPlanException exception):
> {code:java}
> ..
> PCollection<Row> outputStream =
>                 sqlRows.apply(
>                         "sql_transform",
>                         SqlTransform.query("select wikimedia_project, 
> sum(views) from PCOLLECTION group by wikimedia_project"));
> outputStream.setCoder(SCHEMA.getRowCoder());
> ..
> {code}
> Stacktrace:
> {code:java}
> Step #1: 11:47:37,562 0    [main] INFO  
> org.apache.beam.runners.dataflow.DataflowRunner - 
> PipelineOptions.filesToStage was not specified. Defaulting to files from the 
> classpath: will stage 117 files. Enable logging at DEBUG level to see which 
> files will be staged.
> Step #1: 11:47:39,845 2283 [main] INFO  
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL:
> Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`)
> Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
> Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project`
> Step #1: 11:47:40,387 2825 [main] INFO  
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan>
> Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
> Step #1:   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
> Step #1: 
> Step #1: Exception in thread "main" 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
>  Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner 
> state:
> Step #1: 
> Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[]
> Step #1: Original rel:
> Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], 
> group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = 
> {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5
> Step #1:   BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], 
> table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 
> rows, 101.0 cpu, 0.0 io}, id = 2
> Step #1: 
> Step #1: Sets:
> Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views)
> Step #1:        rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81
> Step #1:                rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, 
> PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
> Step #1:        rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405
> Step #1:                
> rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]),
>  rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, 
> 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
> Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1)
> Step #1:        rel#6:Subset#1.NONE.[], best=null, importance=0.9
> Step #1:                
> rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)),
>  rowcount=10.0, cumulative cost={inf}
> Step #1:        rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0
> Step #1:                
> rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]),
>  rowcount=10.0, cumulative cost={inf}
> Step #1: 
> Step #1: 
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> Step #1:        at 
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96)
> Step #1:        at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
> Step #1:        at 
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> Step #1:        at 
> org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> Step #1:        at 
> org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
> Step #1:        at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59)
> Step #1: :run FAILED
> Step #1: 
> Step #1: FAILURE: Build failed with an exception.
> {code}
> I'm using Beam 2.6.0
> Am I missing something obvious?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to