[ 
https://issues.apache.org/jira/browse/BEAM-5335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606168#comment-16606168
 ] 

Anton Kedin commented on BEAM-5335:
-----------------------------------

And we need a test to chain multiple queries

> [SQL] Output schema is set incorrectly
> --------------------------------------
>
>                 Key: BEAM-5335
>                 URL: https://issues.apache.org/jira/browse/BEAM-5335
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Priority: Major
>
> *From: 
> https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query
>  :*
> I've been playing with the Beam SQL DSL and I'm unable to use the output from 
> a query without providing a code that's aware of the output schema manually. 
> Can I infer the output schema rather than hardcoding it?
> Neither the walkthrough or the examples actually use the output from a query. 
> I'm using Scio rather than the plain Java API to keep the code relatively 
> readable and concise, I don't think that makes a difference for this question.
> Here's an example of what I mean.
> Given an input schema inSchema and some data source that is mapped onto a Row 
> as follows: (in this example, Avro-based, but again, I don't think that 
> matters):
> {code}
> sc.avroFile[Foo](args("input"))
>    .map(fooToRow)
>    .setCoder(inSchema.getRowCoder)
>    .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
>    .saveAsTextFile(args("output"))
> {code}
> Running this pipeline results in a KryoException as follows:
> {code}
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> fieldIndices (org.apache.beam.sdk.schemas.Schema)
> schema (org.apache.beam.sdk.values.RowWithStorage)
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> {code}
> However, inserting a RowCoder matching the SQL output, in this case a single 
> count int column:
> {code}
>    ...snip...
>    .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
>    .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
>    .saveAsTextFile(args("output"))
> {code}
> Now the pipeline runs just fine.
> Having to manually tell the pipeline how to encode the SQL output seems 
> unnecessary, given that we specify the input schema/coder(s) and a query. It 
> seems to me that we should be able to infer the output schema from that - but 
> I can't see how, other than maybe using Calcite directly?
> Before raising a ticket on the Beam Jira, I thought I'd check I wasn't 
> missing something obvious!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to