This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch revert-17100-cyang/rfbq-interactive-fix in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3ccd12e69bda2816929a54cf3639738c82733782 Author: Chamikara Jayalath <chamik...@google.com> AuthorDate: Mon Mar 21 10:20:13 2022 -0700 Revert "[BEAM-14112] Avoid storing a generator in _CustomBigQuerySource (#17100)" This reverts commit 62a661071b7db15e71d236abe68e15582e8997c9. --- sdks/python/apache_beam/io/gcp/bigquery.py | 38 +++++++++++++--------- .../apache_beam/io/gcp/bigquery_read_it_test.py | 12 ------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index fc0f4b7..0503de8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -705,7 +705,7 @@ class _CustomBigQuerySource(BoundedSource): self.flatten_results = flatten_results self.coder = coder or _JsonToDictCoder self.kms_key = kms_key - self.export_result = None + self.split_result = None self.options = pipeline_options self.bq_io_metadata = None # Populate in setup, as it may make an RPC self.bigquery_job_labels = bigquery_job_labels or {} @@ -789,26 +789,19 @@ class _CustomBigQuerySource(BoundedSource): project = self.project return project - def _create_source(self, path, bq): + def _create_source(self, path, schema): if not self.use_json_exports: return create_avro_source(path) else: - if isinstance(self.table_reference, vp.ValueProvider): - table_ref = bigquery_tools.parse_table_reference( - self.table_reference.get(), project=self.project) - else: - table_ref = self.table_reference - table = bq.get_table( - table_ref.projectId, table_ref.datasetId, table_ref.tableId) return TextSource( path, min_bundle_size=0, compression_type=CompressionTypes.UNCOMPRESSED, strip_trailing_newlines=True, - coder=self.coder(table.schema)) + coder=self.coder(schema)) def split(self, desired_bundle_size, start_position=None, stop_position=None): - if self.export_result is None: + if self.split_result is None: bq = bigquery_tools.BigQueryWrapper( temp_dataset_id=( self.temp_dataset.datasetId if self.temp_dataset else None)) @@ -820,13 +813,16 @@ class _CustomBigQuerySource(BoundedSource): if not self.table_reference.projectId: self.table_reference.projectId = self._get_project() - self.export_result = self._export_files(bq) + schema, metadata_list = self._export_files(bq) + # Sources to be created lazily within a generator as they're output. + self.split_result = ( + self._create_source(metadata.path, schema) + for metadata in metadata_list) if self.query is not None: bq.clean_up_temporary_dataset(self._get_project()) - for metadata in self.export_result: - source = self._create_source(metadata.path, bq) + for source in self.split_result: yield SourceBundle( weight=1.0, source=source, start_position=None, stop_position=None) @@ -878,7 +874,7 @@ class _CustomBigQuerySource(BoundedSource): """Runs a BigQuery export job. Returns: - a list of FileMetadata instances + bigquery.TableSchema instance, a list of FileMetadata instances """ job_labels = self._get_bq_metadata().add_additional_bq_job_labels( self.bigquery_job_labels) @@ -908,7 +904,17 @@ class _CustomBigQuerySource(BoundedSource): job_labels=job_labels, use_avro_logical_types=True) bq.wait_for_bq_job(job_ref) - return FileSystems.match([gcs_location])[0].metadata_list + metadata_list = FileSystems.match([gcs_location])[0].metadata_list + + if isinstance(self.table_reference, vp.ValueProvider): + table_ref = bigquery_tools.parse_table_reference( + self.table_reference.get(), project=self.project) + else: + table_ref = self.table_reference + table = bq.get_table( + table_ref.projectId, table_ref.datasetId, table_ref.tableId) + + return table.schema, metadata_list class _CustomBigQueryStorageSource(BoundedSource): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index e47754d..9101039 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -37,8 +37,6 @@ from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.value_provider import StaticValueProvider -from apache_beam.runners.interactive import interactive_beam -from apache_beam.runners.interactive.interactive_runner import InteractiveRunner from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -675,16 +673,6 @@ class ReadAllBQTests(BigQueryReadIntegrationTests): equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3)) -class ReadInteractiveRunnerTests(BigQueryReadIntegrationTests): - @skip(['PortableRunner', 'FlinkRunner']) - @pytest.mark.it_postcommit - def test_read_in_interactive_runner(self): - p = beam.Pipeline(InteractiveRunner(), argv=self.args) - pcoll = p | beam.io.ReadFromBigQuery(query="SELECT 1") - result = interactive_beam.collect(pcoll) - assert result.iloc[0, 0] == 1 - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()