Anton Kedin created BEAM-5335:
---------------------------------

             Summary: [SQL] Output schema is set incorrectly
                 Key: BEAM-5335
                 URL: https://issues.apache.org/jira/browse/BEAM-5335
             Project: Beam
          Issue Type: Bug
          Components: dsl-sql
            Reporter: Anton Kedin


*From: 
https://stackoverflow.com/questions/52181795/how-do-i-get-an-output-schema-for-an-apache-beam-sql-query
 :*

I've been playing with the Beam SQL DSL and I'm unable to use the output from a 
query without providing a code that's aware of the output schema manually. Can 
I infer the output schema rather than hardcoding it?

Neither the walkthrough or the examples actually use the output from a query. 
I'm using Scio rather than the plain Java API to keep the code relatively 
readable and concise, I don't think that makes a difference for this question.

Here's an example of what I mean.

Given an input schema inSchema and some data source that is mapped onto a Row 
as follows: (in this example, Avro-based, but again, I don't think that 
matters):

{code}
sc.avroFile[Foo](args("input"))
   .map(fooToRow)
   .setCoder(inSchema.getRowCoder)
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .saveAsTextFile(args("output"))
{code}

Running this pipeline results in a KryoException as follows:

{code}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
fieldIndices (org.apache.beam.sdk.schemas.Schema)
schema (org.apache.beam.sdk.values.RowWithStorage)
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
{code}

However, inserting a RowCoder matching the SQL output, in this case a single 
count int column:

{code}
   ...snip...
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
   .saveAsTextFile(args("output"))
{code}

Now the pipeline runs just fine.

Having to manually tell the pipeline how to encode the SQL output seems 
unnecessary, given that we specify the input schema/coder(s) and a query. It 
seems to me that we should be able to infer the output schema from that - but I 
can't see how, other than maybe using Calcite directly?

Before raising a ticket on the Beam Jira, I thought I'd check I wasn't missing 
something obvious!





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to