ryanthompson591 commented on code in PR #23558:
URL: https://github.com/apache/beam/pull/23558#discussion_r992324815


##########
CHANGES.md:
##########
@@ -62,6 +62,7 @@
 * Decreased TextSource CPU utilization by 2.3x (Java) 
([#23193](https://github.com/apache/beam/issues/23193)).
 * Fixed bug when using SpannerIO with RuntimeValueProvider options (Java) 
([#22146](https://github.com/apache/beam/issues/22146)).
 * Fixed issue for unicode rendering on WriteToBigQuery 
([#10785](https://github.com/apache/beam/issues/10785))
+* Remove obsolete variant of BigQuery Write 
([#23564](https://github.com/apache/beam/issues/23564)).

Review Comment:
   Maybe comment that old code using this variant will redirect to the new 
variant with a warning.  This isn't a breaking change right?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1414,167 +1410,12 @@ def __next__(self):
 
 
 @deprecated(since='2.11.0', current="WriteToBigQuery")
-class BigQuerySink(dataflow_io.NativeSink):
-  """A sink based on a BigQuery table.
-
-  This BigQuery sink triggers a Dataflow native sink for BigQuery
-  that only supports batch pipelines.
-  Instead of using this sink directly, please use WriteToBigQuery
-  transform that works for both batch and streaming pipelines.
-  """
-  def __init__(
-      self,
-      table,
-      dataset=None,
-      project=None,
-      schema=None,
-      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
-      write_disposition=BigQueryDisposition.WRITE_EMPTY,
-      validate=False,
-      coder=None,
-      kms_key=None):
-    """Initialize a BigQuerySink.
-
-    Args:
-      table (str): The ID of the table. If **dataset** argument is :data:`None`
-        then the table argument must contain the entire table reference
-        specified as: ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
-      dataset (str): The ID of the dataset containing this table or
-        :data:`None` if the table reference is specified entirely by the table
-        argument.
-      project (str): The ID of the project containing this table or
-        :data:`None` if the table reference is specified entirely by the table
-        argument.
-      schema (str): The schema to be used if the BigQuery table to write has
-        to be created. This can be either specified as a
-        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` object or a single string  of the form
-        ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
-        separated list of fields. Here ``'type'`` should specify the BigQuery
-        type of the field. Single string based schemas do not support nested
-        fields, repeated fields, or specifying a BigQuery mode for fields (mode
-        will always be set to ``'NULLABLE'``).
-      create_disposition (BigQueryDisposition): A string describing what
-        happens if the table does not exist. Possible values are:
-
-          * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
-            exist.
-          * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does 
not
-            exist.
-
-      write_disposition (BigQueryDisposition): A string describing what
-        happens if the table has already some data. Possible values are:
-
-          * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
-          * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
-          * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table 
not
-            empty.
-
-      validate (bool): If :data:`True`, various checks will be done when sink
-        gets initialized (e.g., is table present given the disposition
-        arguments?). This should be :data:`True` for most scenarios in order to
-        catch errors as early as possible (pipeline construction instead of
-        pipeline execution). It should be :data:`False` if the table is created
-        during pipeline execution by a previous step.
-      coder (~apache_beam.coders.coders.Coder): The coder for the
-        table rows if serialized to disk. If :data:`None`, then the default
-        coder is 
:class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`,
-        which will interpret every element written to the sink as a dictionary
-        that will be JSON serialized as a line in a file. This argument needs a
-        value only in special cases when writing table rows as dictionaries is
-        not desirable.
-      kms_key (str): Optional Cloud KMS key name for use when creating new
-        tables.
-
-    Raises:
-      TypeError: if the schema argument is not a :class:`str` or a
-        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
-bigquery_v2_messages.TableSchema` object.
-      ValueError: if the table reference as a string does not
-        match the expected format.
-    """
-    # Import here to avoid adding the dependency for local running scenarios.
-    try:
-      # pylint: disable=wrong-import-order, wrong-import-position
-      from apitools.base import py  # pylint: disable=unused-import
-    except ImportError:
-      raise ImportError(
-          'Google Cloud IO not available, '
-          'please install apache_beam[gcp]')
-
-    self.table_reference = bigquery_tools.parse_table_reference(
-        table, dataset, project)
-    # Transform the table schema into a bigquery.TableSchema instance.
-    if isinstance(schema, str):
-      # TODO(silviuc): Should add a regex-based validation of the format.
-      table_schema = bigquery.TableSchema()
-      schema_list = [s.strip(' ') for s in schema.split(',')]
-      for field_and_type in schema_list:
-        field_name, field_type = field_and_type.split(':')
-        field_schema = bigquery.TableFieldSchema()
-        field_schema.name = field_name
-        field_schema.type = field_type
-        field_schema.mode = 'NULLABLE'
-        table_schema.fields.append(field_schema)
-      self.table_schema = table_schema
-    elif schema is None:
-      # TODO(silviuc): Should check that table exists if no schema specified.
-      self.table_schema = schema
-    elif isinstance(schema, bigquery.TableSchema):
-      self.table_schema = schema
-    else:
-      raise TypeError('Unexpected schema argument: %s.' % schema)
-
-    self.create_disposition = BigQueryDisposition.validate_create(
-        create_disposition)
-    self.write_disposition = BigQueryDisposition.validate_write(
-        write_disposition)
-    self.validate = validate
-    self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
-    self.kms_key = kms_key
-
-  def display_data(self):
-    res = {}
-    if self.table_reference is not None:
-      tableSpec = '{}.{}'.format(
-          self.table_reference.datasetId, self.table_reference.tableId)
-      if self.table_reference.projectId is not None:
-        tableSpec = '{}:{}'.format(self.table_reference.projectId, tableSpec)
-      res['table'] = DisplayDataItem(tableSpec, label='Table')
-
-    res['validation'] = DisplayDataItem(
-        self.validate, label="Validation Enabled")
-    return res
-
-  def schema_as_json(self):
-    """Returns the TableSchema associated with the sink as a JSON string."""
-    def schema_list_as_object(schema_list):
-      """Returns a list of TableFieldSchema objects as a list of dicts."""
-      fields = []
-      for f in schema_list:
-        fs = {'name': f.name, 'type': f.type}
-        if f.description is not None:
-          fs['description'] = f.description
-        if f.mode is not None:
-          fs['mode'] = f.mode
-        if f.type.lower() == 'record':
-          fs['fields'] = schema_list_as_object(f.fields)
-        fields.append(fs)
-      return fields
-
-    return json.dumps(
-        {'fields': schema_list_as_object(self.table_schema.fields)})
-
-  @property
-  def format(self):
-    """Sink format name required for remote execution."""
-    return 'bigquery'
-
-  def writer(self, test_bigquery_client=None, buffer_size=None):
-    return bigquery_tools.BigQueryWriter(
-        sink=self,
-        test_bigquery_client=test_bigquery_client,
-        buffer_size=buffer_size)
+def BigQuerySink(*args, **kwargs):

Review Comment:
   If it makes sense, add a test to make sure this fallback works.
   
   Since any code calling the fallback will expect the same interface as the 
deprecated BigQuerySink right? Is the interface exactly the same, or could 
users get runtime errors in unexpected places?
   
   If there are unexpected runtime errors consider fully sunsetting instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to