This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1fa7f45cf91 [BEAM-14112] Fixed ReadFromBigQuery with Interactive Beam (#17306) 1fa7f45cf91 is described below commit 1fa7f45cf91681bb8ae53a23da705cb543a2bb58 Author: Ning Kang <ningkang0...@gmail.com> AuthorDate: Wed Apr 20 10:03:19 2022 -0700 [BEAM-14112] Fixed ReadFromBigQuery with Interactive Beam (#17306) Converted the split_result attribute to a dataclass that is pickle-able and only stores paths and coder instead of sources or table schemas to further reduce the size of the transform objects and redundant coder creations. Note: it's never guaranteed that a transform should be pickle-able, this is just to work with the current Interactive Beam implementation. --- sdks/python/apache_beam/io/gcp/bigquery.py | 27 ++++++++++++++-------- .../apache_beam/io/gcp/bigquery_read_it_test.py | 12 ++++++++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 6592a61228b..f443d24950f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -277,6 +277,7 @@ import logging import random import time import uuid +from dataclasses import dataclass from typing import Dict from typing import List from typing import Optional @@ -657,6 +658,14 @@ class _BigQuerySource(dataflow_io.NativeSource): kms_key=self.kms_key) +# TODO(BEAM-14331): remove the serialization restriction in transform +# implementation once InteractiveRunner can work without runner api roundtrips. +@dataclass +class _BigQueryExportResult: + coder: beam.coders.Coder + paths: List[str] + + class _CustomBigQuerySource(BoundedSource): def __init__( self, @@ -705,7 +714,7 @@ class _CustomBigQuerySource(BoundedSource): self.flatten_results = flatten_results self.coder = coder or _JsonToDictCoder self.kms_key = kms_key - self.split_result = None + self.export_result = None self.options = pipeline_options self.bq_io_metadata = None # Populate in setup, as it may make an RPC self.bigquery_job_labels = bigquery_job_labels or {} @@ -789,7 +798,7 @@ class _CustomBigQuerySource(BoundedSource): project = self.project return project - def _create_source(self, path, schema): + def _create_source(self, path, coder): if not self.use_json_exports: return create_avro_source(path) else: @@ -798,10 +807,10 @@ class _CustomBigQuerySource(BoundedSource): min_bundle_size=0, compression_type=CompressionTypes.UNCOMPRESSED, strip_trailing_newlines=True, - coder=self.coder(schema)) + coder=coder) def split(self, desired_bundle_size, start_position=None, stop_position=None): - if self.split_result is None: + if self.export_result is None: bq = bigquery_tools.BigQueryWrapper( temp_dataset_id=( self.temp_dataset.datasetId if self.temp_dataset else None)) @@ -814,15 +823,15 @@ class _CustomBigQuerySource(BoundedSource): self.table_reference.projectId = self._get_project() schema, metadata_list = self._export_files(bq) - # Sources to be created lazily within a generator as they're output. - self.split_result = ( - self._create_source(metadata.path, schema) - for metadata in metadata_list) + self.export_result = _BigQueryExportResult( + coder=self.coder(schema), + paths=[metadata.path for metadata in metadata_list]) if self.query is not None: bq.clean_up_temporary_dataset(self._get_project()) - for source in self.split_result: + for path in self.export_result.paths: + source = self._create_source(path, self.export_result.coder) yield SourceBundle( weight=1.0, source=source, start_position=None, stop_position=None) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 91010395a0b..e47754d1521 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -37,6 +37,8 @@ from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.value_provider import StaticValueProvider +from apache_beam.runners.interactive import interactive_beam +from apache_beam.runners.interactive.interactive_runner import InteractiveRunner from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -673,6 +675,16 @@ class ReadAllBQTests(BigQueryReadIntegrationTests): equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3)) +class ReadInteractiveRunnerTests(BigQueryReadIntegrationTests): + @skip(['PortableRunner', 'FlinkRunner']) + @pytest.mark.it_postcommit + def test_read_in_interactive_runner(self): + p = beam.Pipeline(InteractiveRunner(), argv=self.args) + pcoll = p | beam.io.ReadFromBigQuery(query="SELECT 1") + result = interactive_beam.collect(pcoll) + assert result.iloc[0, 0] == 1 + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()