Hi,

I have a job that uses BigQuery IO Connector to write to a BigQuery table.
When I test it with a small number of records (100) it works as expected
but when I tested it with a larger number of records (10000), I don’t see
all of the records written to the output table but only part of it. It
changes from run to run but no more than 1000 records as I have seen so far.

There are WARNING log entries in the job log like this

by client #0 failed with error, operations will be retried.
com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists:
ALREADY_EXISTS: The offset is within stream, expected offset 933,
received 0

And one log entry like this

Finalize of stream [stream-name] finished with row_count: 683

If I sum all the numbers reported in the WARNING message with the one in
the finalize of stream above, I get 10000, which is exactly the number of
input records.

My pipeline uses 1 worker and it looks like this

WriteResult writeResult = inputCollection.apply(
                "Save Events To BigQuery",
                BigQueryIO.<MyEvent>write()
                    .to(options.getTable())
                    .withFormatFunction(TableRowMappers::toRow)
                    .withMethod(Method.STORAGE_WRITE_API)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo());

    writeResult
        .getFailedStorageApiInserts()
        .apply("Log failed inserts", ParDo.of(new PrintFn<>()));

There are no log entries for the failed inserts.

Is there anything wrong with my pipeline code or is it a bug in BigQuery IO
Connector?

Thanks
-Binh

Reply via email to