[ 
https://issues.apache.org/jira/browse/BEAM-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16758496#comment-16758496
 ] 

Reuven Lax commented on BEAM-6566:
----------------------------------

Also this change should have been called out in the release notes. Backwards 
incompatible changes are allowed right now (since this is all marked 
Experimental), however we should have included this explanation in the notes. 
Not putting this in the release notes was an oversight.

> SqlTransform does not work for beam version above 2.6.0
> -------------------------------------------------------
>
>                 Key: BEAM-6566
>                 URL: https://issues.apache.org/jira/browse/BEAM-6566
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.7.0, 2.8.0, 2.9.0
>            Reporter: Xuefeng Zhang
>            Assignee: Kenneth Knowles
>            Priority: Critical
>             Fix For: 2.10.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> *Issue*:
> Beam versions above 2.6.0 do not work for SqlTransform. By looking at the 
> code, those versions use PCollection.getSchema, this function never works 
> even for 2.6.0
> *Details:*
> Beam 2.6.0, class BeamPcollectionTable which is used by SqlTransform:
> public BeamPCollectionTable(PCollection<Row> upstream) \{   super(((RowCoder) 
> upstream.getCoder()).getSchema());   this.upstream = upstream; }
> But for Beam 2.7.0 and 2.8.0, it is changed to : 
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamPCollectionTable.java
> So that got below errors after upgrading beam version from 2.6.0 to 2.9.0
> java.lang.IllegalStateException: Cannot call getSchema when there is no schema
> at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
>  at 
> org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
>  at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:111)
>  at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:91)
>  at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>  at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
> *Codes:*
> Schema schema = Schema.builder()
> .addStringField("weightMarketValue")
> .addStringField("ticker")
> .addStringField("ratingLongTermFitchRaw")
> .build();
> Row row = Row.withSchema(schema)
> .addValues("weightMarketValue 1", "ticker 1", "ratingLongTermFitchRaw 1")
> .build();
> Version 1:
>  
> {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color}
> {color:#FF0000} .withSchema(schema, SerializableFunctions.identity(), 
> SerializableFunctions.identity()){color}
> {color:#FF0000} .withCoder(RowCoder.of(schema)));{color}
> PCollection<Row> output  = input.apply(SqlTransform.query("select * from 
> PCOLLECTION"));
> Version 2:
> {color:#FF0000}PCollection<Row> input = p.apply(Create.of(row){color}
> {color:#FF0000}.withRowSchema(schema){color}
> {color:#FF0000}.withCoder(RowCoder.of(schema)));{color}
> PCollection<Row> output  = input.apply(SqlTransform.query("select * from 
> PCOLLECTION"));
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to