[ https://issues.apache.org/jira/browse/BEAM-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-5384 started by Anton Kedin. ----------------------------------------- > [SQL] Calcite optimizes away LogicalProject > ------------------------------------------- > > Key: BEAM-5384 > URL: https://issues.apache.org/jira/browse/BEAM-5384 > Project: Beam > Issue Type: Bug > Components: dsl-sql > Reporter: Anton Kedin > Assignee: Anton Kedin > Priority: Major > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > *From > [https://stackoverflow.com/questions/52313324/beam-sql-wont-work-when-using-aggregation-in-statement-cannot-plan-execution] > :* > I have a basic Beam pipeline that reads from GCS, does a Beam SQL transform > and writes the results to BigQuery. > When I don't do any aggregation in my SQL statement it works fine: > {code:java} > .. > PCollection<Row> outputStream = > sqlRows.apply( > "sql_transform", > SqlTransform.query("select views from PCOLLECTION")); > outputStream.setCoder(SCHEMA.getRowCoder()); > .. > {code} > However, when I try to aggregate with a sum then it fails (throws a > CannotPlanException exception): > {code:java} > .. > PCollection<Row> outputStream = > sqlRows.apply( > "sql_transform", > SqlTransform.query("select wikimedia_project, > sum(views) from PCOLLECTION group by wikimedia_project")); > outputStream.setCoder(SCHEMA.getRowCoder()); > .. > {code} > Stacktrace: > {code:java} > Step #1: 11:47:37,562 0 [main] INFO > org.apache.beam.runners.dataflow.DataflowRunner - > PipelineOptions.filesToStage was not specified. Defaulting to files from the > classpath: will stage 117 files. Enable logging at DEBUG level to see which > files will be staged. > Step #1: 11:47:39,845 2283 [main] INFO > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL: > Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`) > Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION` > Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project` > Step #1: 11:47:40,387 2825 [main] INFO > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan> > Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) > Step #1: BeamIOSourceRel(table=[[beam, PCOLLECTION]]) > Step #1: > Step #1: Exception in thread "main" > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: > Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner > state: > Step #1: > Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[] > Step #1: Original rel: > Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], > group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = > {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5 > Step #1: BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], > table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 > rows, 101.0 cpu, 0.0 io}, id = 2 > Step #1: > Step #1: Sets: > Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views) > Step #1: rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81 > Step #1: rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, > PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io} > Step #1: rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405 > Step #1: > rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]), > rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, > 1.7976931348623157E308 cpu, 1.7976931348623157E308 io} > Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1) > Step #1: rel#6:Subset#1.NONE.[], best=null, importance=0.9 > Step #1: > rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)), > rowcount=10.0, cumulative cost={inf} > Step #1: rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0 > Step #1: > rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]), > rowcount=10.0, cumulative cost={inf} > Step #1: > Step #1: > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > Step #1: at > org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336) > Step #1: at > org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138) > Step #1: at > org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105) > Step #1: at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96) > Step #1: at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79) > Step #1: at > org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > Step #1: at > org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) > Step #1: at > org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338) > Step #1: at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59) > Step #1: :run FAILED > Step #1: > Step #1: FAILURE: Build failed with an exception. > {code} > I'm using Beam 2.6.0 > Am I missing something obvious? -- This message was sent by Atlassian JIRA (v7.6.3#76005)