[ 
https://issues.apache.org/jira/browse/BEAM-9650?focusedWorklogId=432687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432687
 ]

ASF GitHub Bot logged work on BEAM-9650:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/20 09:22
            Start Date: 11/May/20 09:22
    Worklog Time Spent: 10m 
      Work Description: Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422903621



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be

Review comment:
       See response above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 432687)
    Time Spent: 11h 50m  (was: 11h 40m)

> Add consistent slowly changing side inputs support
> --------------------------------------------------
>
>                 Key: BEAM-9650
>                 URL: https://issues.apache.org/jira/browse/BEAM-9650
>             Project: Beam
>          Issue Type: Bug
>          Components: io-ideas
>            Reporter: Mikhail Gryzykhin
>            Assignee: Mikhail Gryzykhin
>            Priority: Major
>          Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> Add implementation for slowly changing dimentions based on [design 
> doc](https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to