[
https://issues.apache.org/jira/browse/BEAM-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548818#comment-17548818
]
Danny McCormick commented on BEAM-10406:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20433
> BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in
> python
> ---------------------------------------------------------------------------------
>
> Key: BEAM-10406
> URL: https://issues.apache.org/jira/browse/BEAM-10406
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Affects Versions: 2.22.0
> Reporter: Nikunj Aggarwal
> Priority: P3
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> We are using FILE_LOADS to write to big query in streaming mode using python.
> Input is coming from a pubsub topic with ~5000 reqs/sec and each request is
> around 6KB. We perform some transforms on the input and then write to
> BigQuery.
>
> {code:java}
> beam.io.WriteToBigQuery(
> table=table_name,
> schema=schema,
> dataset=dataset_name,
> project=project',
> method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
> triggering_frequency=2 * 60,
> create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
> write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
> )
> {code}
>
> We noticed that each temporary GCS file created by the load process had a
> very small number of rows (~1-5). We are able to reproduce it both through
> direct runner and dataflow runnner. On debugging we believe the following to
> be an issue:
> In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create
> destinations within start_bundle and clear them up in finish_bundle. When
> this is used within streaming mode, a typical bundle size within Pardo is
> coming out to be ~1-5. We do see Windowing applied before Pardo but since
> there's no GroupByKey, window doesn't affect Pardo. Below is a small code
> which can reproduce the issue highlighted:
>
>
> {code:java}
> class WriteRecordsToFile(beam.DoFn):
> def start_bundle(self):
> print('start bundle')
> self.data = []
> def process(self, element):
> self.data.append(element)
> def finish_bundle(self):
> print('finish bundle', len(self.data))
> self.data = []
> def run(argv=None):
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input_subscription',
> required=True,
> help='Input PubSub subscription of the form
> "projects/<project>/subscriptions/<subscription>".')
> known_args, pipeline_args = parser.parse_known_args(argv)
> with beam.Pipeline(argv=pipeline_args) as p:
> lines = p |
> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
> (lines
> | beam.WindowInto(beam.window.GlobalWindows(),
> trigger=trigger.Repeatedly(
> trigger.AfterAny(
> trigger.AfterProcessingTime(
> 60),
> trigger.AfterCount(
> 100))),
> accumulation_mode=trigger.AccumulationMode.DISCARDING)
> | beam.ParDo(WriteRecordsToFile())){code}
>
> In the above example, we see that start bundle is called very often and does
> not respect triggers.
> To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping
> after the window triggers before calling the ParDo(WriteRecordsToFile).
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)