This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/bq_writes
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d674a8ed41764f6d37f8a6f1981c8dea8c2530ae
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Fri Sep 27 13:12:56 2024 -0400

    Always write to BQ from global window
---
 .github/trigger_files/beam_PostCommit_Python.json                     | 2 +-
 .../beam_PostCommit_Python_ValidatesContainer_Dataflow.json           | 3 ++-
 sdks/python/apache_beam/io/gcp/bigquery.py                            | 3 +++
 sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py              | 4 ++--
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 30ee463ad4e..1eb60f6e495 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 2
+  "modification": 3
 }
 
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
index d6c608f6dab..4897480d69a 100644
--- 
a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
+++ 
b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
@@ -1,3 +1,4 @@
 {
-    "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+    "comment": "Modify this file in a trivial way to cause this test suite to 
run",
+    "modification": 1
 }
\ No newline at end of file
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index b897df2d32a..a5f84dc441e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1864,6 +1864,9 @@ class _StreamToBigQuery(PTransform):
     return (
         tagged_data
         | 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
+        # Use global window for writes since we're outputting back into the
+        # global window.
+        | 'Window into Global Window' >> beam.WindowInto(GlobalWindows())
         | 'StreamInsertRows' >> ParDo(
             bigquery_write_fn, *self.schema_side_inputs).with_outputs(
                 BigQueryWriteFn.FAILED_ROWS,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index b0140793cf7..bb5c36a3e9b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -470,7 +470,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
     input_data = [{
         'number': 1,
         'str': 'some_string',
-    }]
+    }]*500
 
     table_schema = {
         "fields": [{
@@ -483,7 +483,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
     bq_result_errors = [({
         'number': 1,
         'str': 'some_string',
-    }, "Not Found")]
+    }, "Not Found")]*500
 
     args = self.test_pipeline.get_full_options_as_args()
 

Reply via email to