Hi,

I tested with streaming insert and file load, and they all worked as
expected. But looks like storage API is the new way to go so want to test
it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.

Thanks
-Binh


On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user <[email protected]>
wrote:

> What version of Beam are you using? There are no known data-loss bugs in
> the connector, however if there has been a regression we would like to
> address it with high priority.
>
> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van <[email protected]>
> wrote:
>
>> Hi,
>>
>> I have a job that uses BigQuery IO Connector to write to a BigQuery
>> table. When I test it with a small number of records (100) it works as
>> expected but when I tested it with a larger number of records (10000), I
>> don’t see all of the records written to the output table but only part of
>> it. It changes from run to run but no more than 1000 records as I have seen
>> so far.
>>
>> There are WARNING log entries in the job log like this
>>
>> by client #0 failed with error, operations will be retried.
>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: 
>> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>>
>> And one log entry like this
>>
>> Finalize of stream [stream-name] finished with row_count: 683
>>
>> If I sum all the numbers reported in the WARNING message with the one in
>> the finalize of stream above, I get 10000, which is exactly the number
>> of input records.
>>
>> My pipeline uses 1 worker and it looks like this
>>
>> WriteResult writeResult = inputCollection.apply(
>>                 "Save Events To BigQuery",
>>                 BigQueryIO.<MyEvent>write()
>>                     .to(options.getTable())
>>                     .withFormatFunction(TableRowMappers::toRow)
>>                     .withMethod(Method.STORAGE_WRITE_API)
>>                     
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>                     
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>                     .withExtendedErrorInfo());
>>
>>     writeResult
>>         .getFailedStorageApiInserts()
>>         .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>>
>> There are no log entries for the failed inserts.
>>
>> Is there anything wrong with my pipeline code or is it a bug in BigQuery
>> IO Connector?
>>
>> Thanks
>> -Binh
>>
>

Reply via email to