[ https://issues.apache.org/jira/browse/BEAM-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606168#comment-16606168 ]
Anton Kedin commented on BEAM-5335: ----------------------------------- And we need a test to chain multiple queries > [SQL] Output schema is set incorrectly > -------------------------------------- > > Key: BEAM-5335 > URL: https://issues.apache.org/jira/browse/BEAM-5335 > Project: Beam > Issue Type: Bug > Components: dsl-sql > Reporter: Anton Kedin > Priority: Major > > *From: > https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query > :* > I've been playing with the Beam SQL DSL and I'm unable to use the output from > a query without providing a code that's aware of the output schema manually. > Can I infer the output schema rather than hardcoding it? > Neither the walkthrough or the examples actually use the output from a query. > I'm using Scio rather than the plain Java API to keep the code relatively > readable and concise, I don't think that makes a difference for this question. > Here's an example of what I mean. > Given an input schema inSchema and some data source that is mapped onto a Row > as follows: (in this example, Avro-based, but again, I don't think that > matters): > {code} > sc.avroFile[Foo](args("input")) > .map(fooToRow) > .setCoder(inSchema.getRowCoder) > .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION")) > .saveAsTextFile(args("output")) > {code} > Running this pipeline results in a KryoException as follows: > {code} > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > Serialization trace: > fieldIndices (org.apache.beam.sdk.schemas.Schema) > schema (org.apache.beam.sdk.values.RowWithStorage) > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > {code} > However, inserting a RowCoder matching the SQL output, in this case a single > count int column: > {code} > ...snip... > .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION")) > .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder) > .saveAsTextFile(args("output")) > {code} > Now the pipeline runs just fine. > Having to manually tell the pipeline how to encode the SQL output seems > unnecessary, given that we specify the input schema/coder(s) and a query. It > seems to me that we should be able to infer the output schema from that - but > I can't see how, other than maybe using Calcite directly? > Before raising a ticket on the Beam Jira, I thought I'd check I wasn't > missing something obvious! -- This message was sent by Atlassian JIRA (v7.6.3#76005)