https://github.com/apache/beam/issues/26515 tracks this issue. The fix was merged. Thanks a lot for reporting this issue, Binh!
On Mon, Apr 17, 2023 at 12:58 PM Binh Nguyen Van <binhn...@gmail.com> wrote: > Hi, > > I tested with streaming insert and file load, and they all worked as > expected. But looks like storage API is the new way to go so want to test > it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow. > > Thanks > -Binh > > > On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <user@beam.apache.org> > wrote: > >> What version of Beam are you using? There are no known data-loss bugs in >> the connector, however if there has been a regression we would like to >> address it with high priority. >> >> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <binhn...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I have a job that uses BigQuery IO Connector to write to a BigQuery >>> table. When I test it with a small number of records (100) it works as >>> expected but when I tested it with a larger number of records (10000), I >>> don’t see all of the records written to the output table but only part of >>> it. It changes from run to run but no more than 1000 records as I have seen >>> so far. >>> >>> There are WARNING log entries in the job log like this >>> >>> by client #0 failed with error, operations will be retried. >>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: >>> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0 >>> >>> And one log entry like this >>> >>> Finalize of stream [stream-name] finished with row_count: 683 >>> >>> If I sum all the numbers reported in the WARNING message with the one in >>> the finalize of stream above, I get 10000, which is exactly the number >>> of input records. >>> >>> My pipeline uses 1 worker and it looks like this >>> >>> WriteResult writeResult = inputCollection.apply( >>> "Save Events To BigQuery", >>> BigQueryIO.<MyEvent>write() >>> .to(options.getTable()) >>> .withFormatFunction(TableRowMappers::toRow) >>> .withMethod(Method.STORAGE_WRITE_API) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> .withExtendedErrorInfo()); >>> >>> writeResult >>> .getFailedStorageApiInserts() >>> .apply("Log failed inserts", ParDo.of(new PrintFn<>())); >>> >>> There are no log entries for the failed inserts. >>> >>> Is there anything wrong with my pipeline code or is it a bug in BigQuery >>> IO Connector? >>> >>> Thanks >>> -Binh >>> >>