This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/bq_writes in repository https://gitbox.apache.org/repos/asf/beam.git
commit d674a8ed41764f6d37f8a6f1981c8dea8c2530ae Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Fri Sep 27 13:12:56 2024 -0400 Always write to BQ from global window --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- .../beam_PostCommit_Python_ValidatesContainer_Dataflow.json | 3 ++- sdks/python/apache_beam/io/gcp/bigquery.py | 3 +++ sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 30ee463ad4e..1eb60f6e495 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index d6c608f6dab..4897480d69a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } \ No newline at end of file diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index b897df2d32a..a5f84dc441e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1864,6 +1864,9 @@ class _StreamToBigQuery(PTransform): return ( tagged_data | 'FromHashableTableRef' >> beam.Map(_restore_table_ref) + # Use global window for writes since we're outputting back into the + # global window. + | 'Window into Global Window' >> beam.WindowInto(GlobalWindows()) | 'StreamInsertRows' >> ParDo( bigquery_write_fn, *self.schema_side_inputs).with_outputs( BigQueryWriteFn.FAILED_ROWS, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index b0140793cf7..bb5c36a3e9b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -470,7 +470,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase): input_data = [{ 'number': 1, 'str': 'some_string', - }] + }]*500 table_schema = { "fields": [{ @@ -483,7 +483,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase): bq_result_errors = [({ 'number': 1, 'str': 'some_string', - }, "Not Found")] + }, "Not Found")]*500 args = self.test_pipeline.get_full_options_as_args()