[ https://issues.apache.org/jira/browse/BEAM-9650?focusedWorklogId=430947&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-430947 ]
ASF GitHub Bot logged work on BEAM-9650: ---------------------------------------- Author: ASF GitHub Bot Created on: 05/May/20 23:00 Start Date: 05/May/20 23:00 Worklog Time Spent: 10m Work Description: pabloem commented on a change in pull request #11582: URL: https://github.com/apache/beam/pull/11582#discussion_r420456908 ########## File path: sdks/python/apache_beam/io/gcp/bigquery.py ########## @@ -1641,3 +1644,314 @@ def process(self, unused_element, signal): *self._args, **self._kwargs)) | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location))) + + +class _ExtractBQData(DoFn): + ''' + PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into + a temporary storage and returns metadata for created files. + ''' + def __init__( + self, + gcs_location_pattern=None, + project=None, + coder=None, + schema=None, + kms_key=None): + + self.gcs_location_pattern = gcs_location_pattern + self.project = project + self.coder = coder or _JsonToDictCoder + self.kms_key = kms_key + self.split_result = None + self.schema = schema + self.target_schema = None + + def process(self, element): + ''' + :param element(ReadAllFromBigQueryRequest): + :return: + ''' + element.validate() + if element.table is not None: + table_reference = bigquery_tools.parse_table_reference(element.table) + query = None + use_legacy_sql = True + else: + query = element.query + use_legacy_sql = element.use_legacy_sql + + flatten_results = element.flatten_results + + bq = bigquery_tools.BigQueryWrapper() + + try: + if element.query is not None: + self._setup_temporary_dataset(bq, query, use_legacy_sql) + table_reference = self._execute_query( + bq, query, use_legacy_sql, flatten_results) + + gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex) + + table_schema = bq.get_table( + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId).schema + + if self.target_schema is None: + self.target_schema = bigquery_tools.parse_table_schema_from_json( + json.dumps(self.schema)) + + if not self.target_schema == table_schema: + raise ValueError(( + "Schema generated by reading from BQ doesn't match expected" + "schema.\nExpected: {}\nActual: {}").format( + self.target_schema, table_schema)) + + metadata_list = self._export_files(bq, table_reference, gcs_location) + + yield pvalue.TaggedOutput('location_to_cleanup', gcs_location) + for metadata in metadata_list: + yield metadata.path + + finally: + if query is not None: + bq.clean_up_temporary_dataset(self.project) + + def _setup_temporary_dataset(self, bq, query, use_legacy_sql): + location = bq.get_query_location(self.project, query, use_legacy_sql) + bq.create_temporary_dataset(self.project, location) + + def _execute_query(self, bq, query, use_legacy_sql, flatten_results): + job = bq._start_query_job( + self.project, + query, + use_legacy_sql, + flatten_results, + job_id=uuid.uuid4().hex, + kms_key=self.kms_key) + job_ref = job.jobReference + bq.wait_for_bq_job(job_ref) + return bq._get_temp_table(self.project) + + def _export_files(self, bq, table_reference, gcs_location): + """Runs a BigQuery export job. + + Returns: + a list of FileMetadata instances + """ + job_id = uuid.uuid4().hex + job_ref = bq.perform_extract_job([gcs_location], + job_id, + table_reference, + bigquery_tools.FileFormat.JSON, + include_header=False) + bq.wait_for_bq_job(job_ref) + metadata_list = FileSystems.match([gcs_location])[0].metadata_list + + return metadata_list + + +class _PassThroughThenCleanupWithSI(PTransform): + """A PTransform that invokes a DoFn after the input PCollection has been + processed. + + DoFn should have arguments (element, side_input, cleanup_signal). + + Utilizes readiness of PCollection to trigger DoFn. + """ + def __init__(self, cleanup_dofn, side_input): + self.cleanup_dofn = cleanup_dofn + self.side_input = side_input + + def expand(self, input): + class PassThrough(beam.DoFn): + def process(self, element): + yield element + + main_output, cleanup_signal = input | beam.ParDo( + PassThrough()).with_outputs( + 'cleanup_signal', main='main') + + _ = ( + input.pipeline + | beam.Create([None]) + | beam.ParDo( + self.cleanup_dofn, + self.side_input, + beam.pvalue.AsSingleton(cleanup_signal))) + + return main_output + + +class ReadAllFromBigQueryRequest: Review comment: I worry that this is a little clunky - but I appreciate that it provides validation, and even type checking if necessary. Perhaps give it a shorter name so it's 'easy' to create. cc: @robertwb thoughts on a Pythonic PCollection element that describes a BigQuery Read action (read from table or read from query)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 430947) Time Spent: 11h 20m (was: 11h 10m) > Add consistent slowly changing side inputs support > -------------------------------------------------- > > Key: BEAM-9650 > URL: https://issues.apache.org/jira/browse/BEAM-9650 > Project: Beam > Issue Type: Bug > Components: io-ideas > Reporter: Mikhail Gryzykhin > Assignee: Mikhail Gryzykhin > Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > Add implementation for slowly changing dimentions based on [design > doc](https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit] -- This message was sent by Atlassian Jira (v8.3.4#803005)