Hi Wout, you are so right - I forgot the --tempLocation= parameter when launching and after that I also needed to set the number of shards by adding: .withNumFileShards(1) Thank you!
Tobi On Wed, Oct 10, 2018 at 3:23 PM Wout Scheepers < wout.scheep...@vente-exclusive.com> wrote: > Hey Tobias, > > > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212) > > > > points to the following code snippet (starting from BatchLoads.java:210) : > > if (bigQueryServices == null) { > try { > GcsPath.fromUri(tempLocation); > } catch (IllegalArgumentException e) { > throw new IllegalArgumentException( > String.format( > "BigQuery temp location expected a valid 'gs://' path, but was > given '%s'", > tempLocation), > e); > } > } > > > > are you sure your templocation is set correctly? I guess it’s needed for > staging a bigquery load job instead of streaming. > > > > Wout > > > > > > > > *From: *"Kaymak, Tobias" <tobias.kay...@ricardo.ch> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Wednesday, 10 October 2018 at 14:18 > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *How to use of BigQueryIO Method.FILE_LOADS when reading from a > unbounded source? > > > > I am trying to read from an unbounded source and using FILE_LOADS instead > of streaming inserts towards BigQuery. > > If I don't have the following two lines > > > > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > > .withTriggeringFrequency(Duration.standardMinutes(10)) > > > > my code works just fine, but uses streaming inserts. If I add them I get a > non-specific stacktrace like: > > > > Exception in thread "main" java.lang.IllegalArgumentException > > at > com.google.common.base.Preconditions.checkArgument(Preconditions.java:108) > > at > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212) > > at > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557) > > at > org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) > > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694) > > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638) > > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070) > > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) > > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) > > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) > > at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181) > > > > where line 181 is the first line of the following code excerpt: > > > > BigQueryIO.<Event>write() > > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > > .withTriggeringFrequency(Duration.standardMinutes(10)) > > > > .to( > > new DynamicDestinations<Event, String>() { > > @Override > > public String getDestination(ValueInSingleWindow<Event> element) { > > return element.getValue().getTopic(); > > } > > > > @Override > > public TableDestination getTable(String destination) { > > return new TableDestination( > > "charged-dialect-824:KafkaStaging.test", null, new > TimePartitioning().setType("DAY")); > > } > > > > @Override > > public TableSchema getSchema(String destination) { > > return inputMessagesConfig.getTableSchema(destination); > > } > > }) > > .withFormatFunction( > > (SerializableFunction<Event, TableRow>) > > event -> convertUserEventToTableRow(event)) > > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); > > > > I am not sure what I am doing wrong here, I tried higher values for the > Duration, but that didn't help. I wasn't able to find the root > > cause for the exception with the debugger, any idea how I can get to the > bottom of this? > > Tobi > > > -- Tobias Kaymak Data Engineer tobias.kay...@ricardo.ch www.ricardo.ch