I am trying to read from an unbounded source and using FILE_LOADS instead of streaming inserts towards BigQuery.
If I don't have the following two lines .withMethod(BigQueryIO.Write.Method.FILE_LOADS) .withTriggeringFrequency(Duration.standardMinutes(10)) my code works just fine, but uses streaming inserts. If I add them I get a non-specific stacktrace like: Exception in thread "main" java.lang.IllegalArgumentException at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108) at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212) at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557) at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325) at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181) where line 181 is the first line of the following code excerpt: BigQueryIO.<Event>write() .withMethod(BigQueryIO.Write.Method.FILE_LOADS) .withTriggeringFrequency(Duration.standardMinutes(10)) .to( new DynamicDestinations<Event, String>() { @Override public String getDestination(ValueInSingleWindow<Event> element) { return element.getValue().getTopic(); } @Override public TableDestination getTable(String destination) { return new TableDestination( "charged-dialect-824:KafkaStaging.test", null, new TimePartitioning().setType("DAY")); } @Override public TableSchema getSchema(String destination) { return inputMessagesConfig.getTableSchema(destination); } }) .withFormatFunction( (SerializableFunction<Event, TableRow>) event -> convertUserEventToTableRow(event)) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); I am not sure what I am doing wrong here, I tried higher values for the Duration, but that didn't help. I wasn't able to find the root cause for the exception with the debugger, any idea how I can get to the bottom of this? Tobi