Hi, I tested with streaming insert and file load, and they all worked as expected. But looks like storage API is the new way to go so want to test it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.
Thanks -Binh On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <[email protected]> wrote: > What version of Beam are you using? There are no known data-loss bugs in > the connector, however if there has been a regression we would like to > address it with high priority. > > On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <[email protected]> > wrote: > >> Hi, >> >> I have a job that uses BigQuery IO Connector to write to a BigQuery >> table. When I test it with a small number of records (100) it works as >> expected but when I tested it with a larger number of records (10000), I >> don’t see all of the records written to the output table but only part of >> it. It changes from run to run but no more than 1000 records as I have seen >> so far. >> >> There are WARNING log entries in the job log like this >> >> by client #0 failed with error, operations will be retried. >> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: >> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0 >> >> And one log entry like this >> >> Finalize of stream [stream-name] finished with row_count: 683 >> >> If I sum all the numbers reported in the WARNING message with the one in >> the finalize of stream above, I get 10000, which is exactly the number >> of input records. >> >> My pipeline uses 1 worker and it looks like this >> >> WriteResult writeResult = inputCollection.apply( >> "Save Events To BigQuery", >> BigQueryIO.<MyEvent>write() >> .to(options.getTable()) >> .withFormatFunction(TableRowMappers::toRow) >> .withMethod(Method.STORAGE_WRITE_API) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >> .withExtendedErrorInfo()); >> >> writeResult >> .getFailedStorageApiInserts() >> .apply("Log failed inserts", ParDo.of(new PrintFn<>())); >> >> There are no log entries for the failed inserts. >> >> Is there anything wrong with my pipeline code or is it a bug in BigQuery >> IO Connector? >> >> Thanks >> -Binh >> >
