This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fa7f45cf91 [BEAM-14112] Fixed ReadFromBigQuery with Interactive Beam 
(#17306)
1fa7f45cf91 is described below

commit 1fa7f45cf91681bb8ae53a23da705cb543a2bb58
Author: Ning Kang <ningkang0...@gmail.com>
AuthorDate: Wed Apr 20 10:03:19 2022 -0700

    [BEAM-14112] Fixed ReadFromBigQuery with Interactive Beam (#17306)
    
    Converted the split_result attribute to a dataclass that is pickle-able
    and only stores paths and coder instead of sources or table schemas to
    further reduce the size of the transform objects and redundant coder
    creations.
    Note: it's never guaranteed that a transform should be pickle-able, this
    is just to work with the current Interactive Beam implementation.
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 27 ++++++++++++++--------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 12 ++++++++++
 2 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 6592a61228b..f443d24950f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -277,6 +277,7 @@ import logging
 import random
 import time
 import uuid
+from dataclasses import dataclass
 from typing import Dict
 from typing import List
 from typing import Optional
@@ -657,6 +658,14 @@ class _BigQuerySource(dataflow_io.NativeSource):
         kms_key=self.kms_key)
 
 
+# TODO(BEAM-14331): remove the serialization restriction in transform
+# implementation once InteractiveRunner can work without runner api roundtrips.
+@dataclass
+class _BigQueryExportResult:
+  coder: beam.coders.Coder
+  paths: List[str]
+
+
 class _CustomBigQuerySource(BoundedSource):
   def __init__(
       self,
@@ -705,7 +714,7 @@ class _CustomBigQuerySource(BoundedSource):
     self.flatten_results = flatten_results
     self.coder = coder or _JsonToDictCoder
     self.kms_key = kms_key
-    self.split_result = None
+    self.export_result = None
     self.options = pipeline_options
     self.bq_io_metadata = None  # Populate in setup, as it may make an RPC
     self.bigquery_job_labels = bigquery_job_labels or {}
@@ -789,7 +798,7 @@ class _CustomBigQuerySource(BoundedSource):
       project = self.project
     return project
 
-  def _create_source(self, path, schema):
+  def _create_source(self, path, coder):
     if not self.use_json_exports:
       return create_avro_source(path)
     else:
@@ -798,10 +807,10 @@ class _CustomBigQuerySource(BoundedSource):
           min_bundle_size=0,
           compression_type=CompressionTypes.UNCOMPRESSED,
           strip_trailing_newlines=True,
-          coder=self.coder(schema))
+          coder=coder)
 
   def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
-    if self.split_result is None:
+    if self.export_result is None:
       bq = bigquery_tools.BigQueryWrapper(
           temp_dataset_id=(
               self.temp_dataset.datasetId if self.temp_dataset else None))
@@ -814,15 +823,15 @@ class _CustomBigQuerySource(BoundedSource):
         self.table_reference.projectId = self._get_project()
 
       schema, metadata_list = self._export_files(bq)
-      # Sources to be created lazily within a generator as they're output.
-      self.split_result = (
-          self._create_source(metadata.path, schema)
-          for metadata in metadata_list)
+      self.export_result = _BigQueryExportResult(
+          coder=self.coder(schema),
+          paths=[metadata.path for metadata in metadata_list])
 
       if self.query is not None:
         bq.clean_up_temporary_dataset(self._get_project())
 
-    for source in self.split_result:
+    for path in self.export_result.paths:
+      source = self._create_source(path, self.export_result.coder)
       yield SourceBundle(
           weight=1.0, source=source, start_position=None, stop_position=None)
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 91010395a0b..e47754d1521 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -37,6 +37,8 @@ from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.runners.interactive import interactive_beam
+from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -673,6 +675,16 @@ class ReadAllBQTests(BigQueryReadIntegrationTests):
           equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3))
 
 
+class ReadInteractiveRunnerTests(BigQueryReadIntegrationTests):
+  @skip(['PortableRunner', 'FlinkRunner'])
+  @pytest.mark.it_postcommit
+  def test_read_in_interactive_runner(self):
+    p = beam.Pipeline(InteractiveRunner(), argv=self.args)
+    pcoll = p | beam.io.ReadFromBigQuery(query="SELECT 1")
+    result = interactive_beam.collect(pcoll)
+    assert result.iloc[0, 0] == 1
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to