[ 
https://issues.apache.org/jira/browse/BEAM-9650?focusedWorklogId=430934&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-430934
 ]

ASF GitHub Bot logged work on BEAM-9650:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/May/20 22:25
            Start Date: 05/May/20 22:25
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r420386613



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:

Review comment:
       why do you need a target_schema?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()

Review comment:
       It would be great if the BQ wrapper could be passed a client as an 
argument, so that a mocked-out BQ client could be used.
   
   See 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L993
 and 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1047-L1048

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:

Review comment:
       Note that this function runs at con struction time. This is before 
RuntimeValueProviders receive their value (this will work fine for string-type 
arguments, but won't work for runtime valueproviders).

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be

Review comment:
       I don't think you need a schema?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:
+  '''
+  Class that defines data to read from BQ.
+  '''
+  def __init__(
+      self,
+      query=None,
+      use_legacy_sql=False,
+      table=None,
+      flatten_results=False):
+    '''
+    Only one of query or table should be specified.
+
+    :param query(str): SQL query to fetch data.
+    :param use_legacy_sql(boolean):
+      Specifies whether to use BigQuery's legacy SQL dialect for this query.
+      The default value is :data:`False`. If set to :data:`True`,
+      the query will use BigQuery's updated SQL dialect with improved standards
+      compliance.
+      This parameter is ignored for table inputs.
+    :param table(str):
+      The ID of the table to read. The ID must contain only letters
+      ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
+      define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
+    :param flatten_results(boolean):
+      Flattens all nested and repeated fields in the query results.
+      The default value is :data:`True`.
+    '''
+    self.flatten_results = flatten_results
+    self.query = query
+    self.use_legacy_sql = use_legacy_sql
+    self.table = table
+    self.validate()
+
+  @classmethod
+  def validate(cls):
+    if cls.table is not None and cls.query is not None:
+      raise ValueError(
+          'Both a BigQuery table and a query were specified.'
+          ' Please specify only one of these.')
+    elif cls.table is None and cls.query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+
+
+@experimental()
+class ReadAllFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    PTransform:ReadAllFromBigQueryRequest->Rows
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    It is recommended not to use this PTransform for streaming jobs on
+    GlobalWindow, since it will not be able to cleanup snapshots.
+
+  Args:
+    gcs_location (str, ValueProvider): The name of the Google Cloud Storage
+      bucket where the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+    project (str): The ID of the project containing this table.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?).
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    schema: The schema to be used if the BigQuery table to write has to be
+      created. This can be either specified as a 'bigquery.TableSchema' object
+      or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+      that defines a comma separated list of fields. Here 'type' should
+      specify the BigQuery type of the field. Single string based schemas do
+      not support nested fields, repeated fields, or specifying a BigQuery
+      mode for fields (mode will always be set to 'NULLABLE').
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+   """
+  def __init__(
+      self,
+      gcs_location=None,
+      project=None,
+      validate=False,
+      coder=None,
+      schema=None,
+      flatten_results=True,
+      kms_key=None):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError(
+            '%s: gcs_location must be of type string'
+            ' or ValueProvider; got %r instead' %
+            (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+
+    if schema is None:
+      raise ValueError("Should provide schema.")
+
+    self.coder = coder
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.schema = bigquery_tools.get_dict_table_schema(schema)
+    self.kms_key = kms_key
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI pattern where the
+    extracted table should be written to.
+    """
+    file_pattern = '{}/bigquery-table-dump-*.json'

Review comment:
       FWIW it may be good to make this a constant somewhere

##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)

Review comment:
       I believe this location looks something like 
`gs://bucket/dir/hexstring1/hexstring2*.json`. right? Is that the appropriate 
URI format [expected by 
BQ](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationextract)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 430934)
    Time Spent: 11h 10m  (was: 11h)

> Add consistent slowly changing side inputs support
> --------------------------------------------------
>
>                 Key: BEAM-9650
>                 URL: https://issues.apache.org/jira/browse/BEAM-9650
>             Project: Beam
>          Issue Type: Bug
>          Components: io-ideas
>            Reporter: Mikhail Gryzykhin
>            Assignee: Mikhail Gryzykhin
>            Priority: Major
>          Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> Add implementation for slowly changing dimentions based on [design 
> doc](https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to