[ https://issues.apache.org/jira/browse/BEAM-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-6566: ---------------------------------- Fix Version/s: 2.10.0 > SqlTransform does not work for beam version above 2.6.0 > ------------------------------------------------------- > > Key: BEAM-6566 > URL: https://issues.apache.org/jira/browse/BEAM-6566 > Project: Beam > Issue Type: Bug > Components: dsl-sql > Affects Versions: 2.7.0, 2.8.0, 2.9.0 > Reporter: Xuefeng Zhang > Assignee: Kenneth Knowles > Priority: Critical > Fix For: 2.10.0 > > > *Issue*: > Beam versions above 2.6.0 do not work for SqlTransform. By looking at the > code, those versions use PCollection.getSchema, this function never works > even for 2.6.0 > *Details:* > Beam 2.6.0, class BeamPcollectionTable which is used by SqlTransform: > public BeamPCollectionTable(PCollection<Row> upstream) \{ super(((RowCoder) > upstream.getCoder()).getSchema()); this.upstream = upstream; } > But for Beam 2.7.0 and 2.8.0, it is changed to : > https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java > So that got below errors after upgrading beam version from 2.6.0 to 2.9.0 > java.lang.IllegalStateException: Cannot call getSchema when there is no schema > at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) > at > org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:111) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:91) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357) > *Codes:* > Schema schema = Schema.builder() > .addStringField("weightMarketValue") > .addStringField("ticker") > .addStringField("ratingLongTermFitchRaw") > .build(); > Row row = Row.withSchema(schema) > .addValues("weightMarketValue 1", "ticker 1", "ratingLongTermFitchRaw 1") > .build(); > Version 1: > > {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color} > {color:#FF0000} .withSchema(schema, SerializableFunctions.identity(), > SerializableFunctions.identity()){color} > {color:#FF0000} .withCoder(RowCoder.of(schema)));{color} > PCollection<Row> output = input.apply(SqlTransform.query("select * from > PCOLLECTION")); > Version 2: > {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color} > {color:#FF0000}.withRowSchema(schema){color} > {color:#FF0000}.withCoder(RowCoder.of(schema)));{color} > PCollection<Row> output = input.apply(SqlTransform.query("select * from > PCOLLECTION")); > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)