It seems an improvement can be made where if CREATE_NEVER is present, table with field based partitioning doesn't have to be associated with a schema.
Cheers On Sat, Mar 24, 2018 at 2:30 PM, Carlos Alonso <car...@mrcalonso.com> wrote: > Otherwise the BQ load job fails with the above error as well (Table with > field based partitioning must have a schema). > On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER, >> then why implement getSchema at all? >> >> >> On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <car...@mrcalonso.com> wrote: >> >>> The thing is that the previous log "Returning schema for ..." never >>> appears, so I don't think anything will appear on the log if I log what you >>> suggest too. >>> >>> Actually, after a couple more attempts, I changed the writeDisposition >>> of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically >>> worked... So I guess there's something wrong when CREATE_NEVER is set or >>> something I don't understand... >>> >>> FYI my BigQueryIO looks like this >>> >>> BigQueryIO.write() >>> .to(new JsonRouter(dataset)) >>> .withFormatFunction(i => JsonRouter.jsonToRow(i._1)) >>> .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) >>> .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND) >>> .withMethod(Write.Method.FILE_LOADS) >>> >>> >>> Thanks! >>> >>> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> Can you try logging the result of your BigQueryUtil.parseSchema and >>>> confirm that it is always non-empty? What does the result look like for the >>>> table that's failing to load? >>>> >>>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <car...@mrcalonso.com> >>>> wrote: >>>> >>>>> Hi everyone!! >>>>> >>>>> When trying to insert into BigQuery using dynamic destinations I get >>>>> this error: "Tabie with field based partitioning must have a schema" that >>>>> suggests that I'm not providing such a schema and I don't understand why >>>>> as >>>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the >>>>> full stack trace and below you can see the code of the DynamicDestinations >>>>> implementation. Basically I'm dumping a stream of PubSub into BQ being >>>>> that >>>>> stream of heterogeneous Json documents and routing each type to its >>>>> corresponding table. >>>>> >>>>> The tuples contain the Json document and the schema itself for the >>>>> corresponding table (the tuple is composed in a previous transform before >>>>> from a side input as the schema is read from BQ using BigQueryClient >>>>> class). and the Destination KV[String, String] is supposed to hold the >>>>> table name as key and the schema as value. >>>>> >>>>> The logs show many entries for "Returning destination for...", a few >>>>> of "Returning table..." ones and no "Returning schema for..." at all which >>>>> may indicate why BQ complains that no schema is provided, the question >>>>> would then be... Why is that method never invoked? >>>>> >>>>> class JsonRouter(dataset: String) >>>>> extends DynamicDestinations[(Json, String), KV[String, String]] { >>>>> >>>>> import JsonRouter._ >>>>> >>>>> override def getDestination(element: ValueInSingleWindow[(Json, >>>>> String)]): KV[String, String] = { >>>>> log.debug(s"Returning destination for ${element.getValue}") >>>>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2) >>>>> } >>>>> >>>>> override def getSchema(element: KV[String, String]): TableSchema = { >>>>> log.debug(s"Returning schema for ${element.getKey}") >>>>> BigQueryUtil.parseSchema(element.getValue) >>>>> } >>>>> >>>>> override def getTable(element: KV[String, String]): TableDestination = { >>>>> log.debug(s"Returning table for ${element.getKey}") >>>>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store >>>>> ${element.getKey}", >>>>> BQTypesRouter.TimePartitioning) >>>>> } >>>>> >>>>> override def getDestinationCoder: Coder[KV[String, String]] = >>>>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()) >>>>> } >>>>> >>>>> >>>>> Thanks!! >>>>> >>>>