This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b5486cf37a3 Remove obsolete native (dataflow) io from Beam Python. 
(#27444)
b5486cf37a3 is described below

commit b5486cf37a31863da9ce4837b6547628031d1bd3
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Wed Jul 12 13:16:36 2023 -0700

    Remove obsolete native (dataflow) io from Beam Python. (#27444)
    
    This was only needed for Runner v1.
---
 sdks/python/apache_beam/io/gcp/pubsub.py           |  26 +-
 sdks/python/apache_beam/io/iobase.py               |  43 +--
 sdks/python/apache_beam/pipeline_test.py           |  53 +---
 .../runners/dataflow/native_io/__init__.py         |  16 -
 .../runners/dataflow/native_io/iobase.py           | 342 ---------------------
 .../runners/dataflow/native_io/iobase_test.py      | 203 ------------
 .../apache_beam/runners/direct/direct_runner.py    |  17 +-
 .../runners/direct/transform_evaluator.py          |  82 +----
 .../runners/worker/bundle_processor_test.py        |   1 -
 9 files changed, 45 insertions(+), 738 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 8cee8acfebb..af58006d6e7 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -39,9 +39,9 @@ from typing import Optional
 from typing import Tuple
 
 from apache_beam import coders
+from apache_beam.io import iobase
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
-from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import Flatten
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
@@ -261,6 +261,7 @@ class ReadFromPubSub(PTransform):
         timestamp_attribute=timestamp_attribute)
 
   def expand(self, pvalue):
+    # TODO(BEAM-27443): Apply a proper transform rather than Read.
     pcoll = pvalue.pipeline | Read(self._source)
     pcoll.element_type = bytes
     if self.with_attributes:
@@ -423,7 +424,8 @@ def parse_subscription(full_subscription):
   return project, subscription_name
 
 
-class _PubSubSource(dataflow_io.NativeSource):
+# TODO(BEAM-27443): Remove (or repurpose as a proper PTransform).
+class _PubSubSource(iobase.SourceBase):
   """Source for a Cloud Pub/Sub topic or subscription.
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
@@ -460,11 +462,6 @@ class _PubSubSource(dataflow_io.NativeSource):
     if subscription:
       self.project, self.subscription_name = parse_subscription(subscription)
 
-  @property
-  def format(self):
-    """Source format name required for remote execution."""
-    return 'pubsub'
-
   def display_data(self):
     return {
         'id_label': DisplayDataItem(self.id_label,
@@ -480,14 +477,15 @@ class _PubSubSource(dataflow_io.NativeSource):
             label='Timestamp Attribute').drop_if_none(),
     }
 
-  def reader(self):
-    raise NotImplementedError
+  def default_output_coder(self):
+    return self.coder
 
   def is_bounded(self):
     return False
 
 
-class _PubSubSink(dataflow_io.NativeSink):
+# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform.
+class _PubSubSink(object):
   """Sink for a Cloud Pub/Sub topic.
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
@@ -505,14 +503,6 @@ class _PubSubSink(dataflow_io.NativeSink):
 
     self.project, self.topic_name = parse_topic(topic)
 
-  @property
-  def format(self):
-    """Sink format name required for remote execution."""
-    return 'pubsub'
-
-  def writer(self):
-    raise NotImplementedError
-
 
 class PubSubSourceDescriptor(NamedTuple):
   """A PubSub source descriptor for ``MultipleReadFromPubSub```
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 6d75d520af5..e15205ead4d 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -101,6 +101,9 @@ class SourceBase(HasDisplayData, urns.RunnerApiFn):
   """
   urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_SOURCE)
 
+  def default_output_coder(self):
+    raise NotImplementedError
+
   def is_bounded(self):
     # type: () -> bool
     raise NotImplementedError
@@ -923,11 +926,8 @@ class Read(ptransform.PTransform):
 
   def _infer_output_coder(self, input_type=None, input_coder=None):
     # type: (...) -> Optional[coders.Coder]
-    from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
-    if isinstance(self.source, BoundedSource):
+    if isinstance(self.source, SourceBase):
       return self.source.default_output_coder()
-    elif isinstance(self.source, dataflow_io.NativeSource):
-      return self.source.coder
     else:
       return None
 
@@ -941,18 +941,17 @@ class Read(ptransform.PTransform):
       self,
       context: PipelineContext,
   ) -> Tuple[str, Any]:
-    from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
-    if isinstance(self.source, (BoundedSource, dataflow_io.NativeSource)):
-      from apache_beam.io.gcp.pubsub import _PubSubSource
-      if isinstance(self.source, _PubSubSource):
-        return (
-            common_urns.composites.PUBSUB_READ.urn,
-            beam_runner_api_pb2.PubSubReadPayload(
-                topic=self.source.full_topic,
-                subscription=self.source.full_subscription,
-                timestamp_attribute=self.source.timestamp_attribute,
-                with_attributes=self.source.with_attributes,
-                id_attribute=self.source.id_label))
+    from apache_beam.io.gcp.pubsub import _PubSubSource
+    if isinstance(self.source, _PubSubSource):
+      return (
+          common_urns.composites.PUBSUB_READ.urn,
+          beam_runner_api_pb2.PubSubReadPayload(
+              topic=self.source.full_topic,
+              subscription=self.source.full_subscription,
+              timestamp_attribute=self.source.timestamp_attribute,
+              with_attributes=self.source.with_attributes,
+              id_attribute=self.source.id_label))
+    if isinstance(self.source, BoundedSource):
       return (
           common_urns.deprecated_primitives.READ.urn,
           beam_runner_api_pb2.ReadPayload(
@@ -976,6 +975,7 @@ class Read(ptransform.PTransform):
     if transform.spec.urn == common_urns.composites.PUBSUB_READ.urn:
       assert isinstance(payload, beam_runner_api_pb2.PubSubReadPayload)
       # Importing locally to prevent circular dependencies.
+      # TODO(BEAM-27443): Remove the need for this.
       from apache_beam.io.gcp.pubsub import _PubSubSource
       source = _PubSubSource(
           topic=payload.topic or None,
@@ -1015,6 +1015,7 @@ ptransform.PTransform.register_urn(
     Read._from_runner_api_parameter_read,
 )
 
+# TODO(BEAM-27443): Remove.
 ptransform.PTransform.register_urn(
     common_urns.composites.PUBSUB_READ.urn,
     beam_runner_api_pb2.PubSubReadPayload,
@@ -1065,10 +1066,11 @@ class Write(ptransform.PTransform):
     return {'sink': self.sink.__class__, 'sink_dd': self.sink}
 
   def expand(self, pcoll):
-    from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
-    if isinstance(self.sink, dataflow_io.NativeSink):
-      # A native sink
-      return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink)
+    # Importing locally to prevent circular dependencies.
+    from apache_beam.io.gcp.pubsub import _PubSubSink
+    if isinstance(self.sink, _PubSubSink):
+      # TODO(BEAM-27443): Remove the need for special casing here.
+      return pvalue.PDone(pcoll.pipeline)
     elif isinstance(self.sink, Sink):
       # A custom sink
       return pcoll | WriteImpl(self.sink)
@@ -1084,6 +1086,7 @@ class Write(ptransform.PTransform):
       self,
       context: PipelineContext,
   ) -> Tuple[str, Any]:
+    # TODO(BEAM-27443): Remove the need for special casing here.
     # Importing locally to prevent circular dependencies.
     from apache_beam.io.gcp.pubsub import _PubSubSink
     if isinstance(self.sink, _PubSubSink):
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 18ab0f091aa..c9ac4ce4c13 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -30,7 +30,7 @@ import apache_beam as beam
 from apache_beam import typehints
 from apache_beam.coders import BytesCoder
 from apache_beam.io import Read
-from apache_beam.metrics import Metrics
+from apache_beam.io.iobase import SourceBase
 from apache_beam.options.pipeline_options import PortableOptions
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
@@ -40,7 +40,6 @@ from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import AsSingleton
 from apache_beam.pvalue import TaggedOutput
-from apache_beam.runners.dataflow.native_io.iobase import NativeSource
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -61,39 +60,9 @@ from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
-# TODO(BEAM-1555): Test is failing on the service, with FakeSource.
 
-
-class FakeSource(NativeSource):
-  """Fake source returning a fixed list of values."""
-  class _Reader(object):
-    def __init__(self, vals):
-      self._vals = vals
-      self._output_counter = Metrics.counter('main', 'outputs')
-
-    def __enter__(self):
-      return self
-
-    def __exit__(self, exception_type, exception_value, traceback):
-      pass
-
-    def __iter__(self):
-      for v in self._vals:
-        self._output_counter.inc()
-        yield v
-
-  def __init__(self, vals):
-    self._vals = vals
-
-  def reader(self):
-    return FakeSource._Reader(self._vals)
-
-
-class FakeUnboundedSource(NativeSource):
+class FakeUnboundedSource(SourceBase):
   """Fake unbounded source. Does not work at runtime"""
-  def reader(self):
-    return None
-
   def is_bounded(self):
     return False
 
@@ -259,24 +228,6 @@ class PipelineTest(unittest.TestCase):
       pcoll = pipeline | 'label' >> Create([[1, 2, 3]])
       assert_that(pcoll, equal_to([[1, 2, 3]]))
 
-  # TODO(BEAM-1555): Test is failing on the service, with FakeSource.
-  # @pytest.mark.it_validatesrunner
-  def test_metrics_in_fake_source(self):
-    pipeline = TestPipeline()
-    pcoll = pipeline | Read(FakeSource([1, 2, 3, 4, 5, 6]))
-    assert_that(pcoll, equal_to([1, 2, 3, 4, 5, 6]))
-    res = pipeline.run()
-    metric_results = res.metrics().query()
-    outputs_counter = metric_results['counters'][0]
-    self.assertEqual(outputs_counter.key.step, 'Read')
-    self.assertEqual(outputs_counter.key.metric.name, 'outputs')
-    self.assertEqual(outputs_counter.committed, 6)
-
-  def test_fake_read(self):
-    with TestPipeline() as pipeline:
-      pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3]))
-      assert_that(pcoll, equal_to([1, 2, 3]))
-
   def test_visit_entire_graph(self):
     pipeline = Pipeline()
     pcoll1 = pipeline | 'pcoll' >> beam.Impulse()
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
deleted file mode 100644
index cce3acad34a..00000000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
deleted file mode 100644
index 3d1afe54690..00000000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ /dev/null
@@ -1,342 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Dataflow native sources and sinks.
-
-For internal use only; no backwards-compatibility guarantees.
-"""
-
-# pytype: skip-file
-
-import logging
-from typing import TYPE_CHECKING
-from typing import Optional
-
-from apache_beam import pvalue
-from apache_beam.io import iobase
-from apache_beam.transforms import ptransform
-from apache_beam.transforms.display import HasDisplayData
-
-if TYPE_CHECKING:
-  from apache_beam import coders
-
-_LOGGER = logging.getLogger(__name__)
-
-
-def _dict_printable_fields(dict_object, skip_fields):
-  """Returns a list of strings for the interesting fields of a dict."""
-  return [
-      '%s=%r' % (name, value) for name,
-      value in dict_object.items()
-      # want to output value 0 but not None nor []
-      if (value or value == 0) and name not in skip_fields
-  ]
-
-
-_minor_fields = [
-    'coder',
-    'key_coder',
-    'value_coder',
-    'config_bytes',
-    'elements',
-    'append_trailing_newlines',
-    'strip_trailing_newlines',
-    'compression_type'
-]
-
-
-class NativeSource(iobase.SourceBase):
-  """A source implemented by Dataflow service.
-
-  This class is to be only inherited by sources natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-
-  This class is deprecated and should not be used to define new sources.
-  """
-  coder = None  # type: Optional[coders.Coder]
-
-  def reader(self):
-    """Returns a NativeSourceReader instance associated with this source."""
-    raise NotImplementedError
-
-  def is_bounded(self):
-    return True
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=', '.join(_dict_printable_fields(self.__dict__, _minor_fields)))
-
-
-class NativeSourceReader(object):
-  """A reader for a source implemented by Dataflow service."""
-  def __enter__(self):
-    """Opens everything necessary for a reader to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a reader executed."""
-    raise NotImplementedError
-
-  def __iter__(self):
-    """Returns an iterator over all the records of the source."""
-    raise NotImplementedError
-
-  @property
-  def returns_windowed_values(self):
-    """Returns whether this reader returns windowed values."""
-    return False
-
-  def get_progress(self):
-    """Returns a representation of how far the reader has read.
-
-    Returns:
-      A SourceReaderProgress object that gives the current progress of the
-      reader.
-    """
-
-  def request_dynamic_split(self, dynamic_split_request):
-    """Attempts to split the input in two parts.
-
-    The two parts are named the "primary" part and the "residual" part. The
-    current 'NativeSourceReader' keeps processing the primary part, while the
-    residual part will be processed elsewhere (e.g. perhaps on a different
-    worker).
-
-    The primary and residual parts, if concatenated, must represent the
-    same input as the current input of this 'NativeSourceReader' before this
-    call.
-
-    The boundary between the primary part and the residual part is
-    specified in a framework-specific way using 'DynamicSplitRequest' e.g.,
-    if the framework supports the notion of positions, it might be a
-    position at which the input is asked to split itself (which is not
-    necessarily the same position at which it *will* split itself); it
-    might be an approximate fraction of input, or something else.
-
-    This function returns a 'DynamicSplitResult', which encodes, in a
-    framework-specific way, the information sufficient to construct a
-    description of the resulting primary and residual inputs. For example, it
-    might, again, be a position demarcating these parts, or it might be a pair
-    of fully-specified input descriptions, or something else.
-
-    After a successful call to 'request_dynamic_split()', subsequent calls
-    should be interpreted relative to the new primary.
-
-    Args:
-      dynamic_split_request: A 'DynamicSplitRequest' describing the split
-        request.
-
-    Returns:
-      'None' if the 'DynamicSplitRequest' cannot be honored (in that
-      case the input represented by this 'NativeSourceReader' stays the same),
-      or a 'DynamicSplitResult' describing how the input was split into a
-      primary and residual part.
-    """
-    _LOGGER.debug(
-        'SourceReader %r does not support dynamic splitting. Ignoring dynamic '
-        'split request: %r',
-        self,
-        dynamic_split_request)
-
-
-class ReaderProgress(object):
-  """A representation of how far a NativeSourceReader has read."""
-  def __init__(
-      self,
-      position=None,
-      percent_complete=None,
-      remaining_time=None,
-      consumed_split_points=None,
-      remaining_split_points=None):
-
-    self._position = position
-
-    if percent_complete is not None:
-      percent_complete = float(percent_complete)
-      if percent_complete < 0 or percent_complete > 1:
-        raise ValueError(
-            'The percent_complete argument was %f. Must be in range [0, 1].' %
-            percent_complete)
-    self._percent_complete = percent_complete
-
-    self._remaining_time = remaining_time
-    self._consumed_split_points = consumed_split_points
-    self._remaining_split_points = remaining_split_points
-
-  @property
-  def position(self):
-    """Returns progress, represented as a ReaderPosition object."""
-    return self._position
-
-  @property
-  def percent_complete(self):
-    """Returns progress, represented as a percentage of total work.
-
-    Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the
-    work range, entire WorkItem complete).
-
-    Returns:
-      Progress represented as a percentage of total work.
-    """
-    return self._percent_complete
-
-  @property
-  def remaining_time(self):
-    """Returns progress, represented as an estimated time remaining."""
-    return self._remaining_time
-
-  @property
-  def consumed_split_points(self):
-    return self._consumed_split_points
-
-  @property
-  def remaining_split_points(self):
-    return self._remaining_split_points
-
-
-class ReaderPosition(object):
-  """A representation of position in an iteration of a 'NativeSourceReader'."""
-  def __init__(
-      self,
-      end=None,
-      key=None,
-      byte_offset=None,
-      record_index=None,
-      shuffle_position=None,
-      concat_position=None):
-    """Initializes ReaderPosition.
-
-    A ReaderPosition may get instantiated for one of these position types. Only
-    one of these should be specified.
-
-    Args:
-      end: position is past all other positions. For example, this may be used
-        to represent the end position of an unbounded range.
-      key: position is a string key.
-      byte_offset: position is a byte offset.
-      record_index: position is a record index
-      shuffle_position: position is a base64 encoded shuffle position.
-      concat_position: position is a 'ConcatPosition'.
-    """
-
-    self.end = end
-    self.key = key
-    self.byte_offset = byte_offset
-    self.record_index = record_index
-    self.shuffle_position = shuffle_position
-
-    if concat_position is not None:
-      assert isinstance(concat_position, ConcatPosition)
-    self.concat_position = concat_position
-
-
-class ConcatPosition(object):
-  """A position that encapsulate an inner position and an index.
-
-  This is used to represent the position of a source that encapsulate several
-  other sources.
-  """
-  def __init__(self, index, position):
-    """Initializes ConcatPosition.
-
-    Args:
-      index: index of the source currently being read.
-      position: inner position within the source currently being read.
-    """
-
-    if position is not None:
-      assert isinstance(position, ReaderPosition)
-    self.index = index
-    self.position = position
-
-
-class DynamicSplitRequest(object):
-  """Specifies how 'NativeSourceReader.request_dynamic_split' should split.
-  """
-  def __init__(self, progress):
-    assert isinstance(progress, ReaderProgress)
-    self.progress = progress
-
-
-class DynamicSplitResult(object):
-  pass
-
-
-class DynamicSplitResultWithPosition(DynamicSplitResult):
-  def __init__(self, stop_position):
-    assert isinstance(stop_position, ReaderPosition)
-    self.stop_position = stop_position
-
-
-class NativeSink(HasDisplayData):
-  """A sink implemented by Dataflow service.
-
-  This class is to be only inherited by sinks natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-  def writer(self):
-    """Returns a SinkWriter for this source."""
-    raise NotImplementedError
-
-  def __repr__(self):
-    return '<{name} {vals}>'.format(
-        name=self.__class__.__name__,
-        vals=_dict_printable_fields(self.__dict__, _minor_fields))
-
-
-class NativeSinkWriter(object):
-  """A writer for a sink implemented by Dataflow service."""
-  def __enter__(self):
-    """Opens everything necessary for a writer to function properly."""
-    raise NotImplementedError
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    """Cleans up after a writer executed."""
-    raise NotImplementedError
-
-  @property
-  def takes_windowed_values(self):
-    """Returns whether this writer takes windowed values."""
-    return False
-
-  def Write(self, o):  # pylint: disable=invalid-name
-    """Writes a record to the sink associated with this writer."""
-    raise NotImplementedError
-
-
-class _NativeWrite(ptransform.PTransform):
-  """A PTransform for writing to a Dataflow native sink.
-
-  These are sinks that are implemented natively by the Dataflow service
-  and hence should not be updated by users. These sinks are processed
-  using a Dataflow native write transform.
-
-  Applying this transform results in a ``pvalue.PDone``.
-  """
-  def __init__(self, sink):
-    """Initializes a Write transform.
-
-    Args:
-      sink: Sink to use for the write
-    """
-    super().__init__()
-    self.sink = sink
-
-  def expand(self, pcoll):
-    self._check_pcollection(pcoll)
-    return pvalue.PDone(pcoll.pipeline)
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
deleted file mode 100644
index 5e72ca555b6..00000000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests corresponding to Dataflow's iobase module."""
-
-# pytype: skip-file
-
-import unittest
-
-from apache_beam import Create
-from apache_beam import error
-from apache_beam import pvalue
-from apache_beam.runners.dataflow.native_io.iobase import ConcatPosition
-from apache_beam.runners.dataflow.native_io.iobase import DynamicSplitRequest
-from apache_beam.runners.dataflow.native_io.iobase import 
DynamicSplitResultWithPosition
-from apache_beam.runners.dataflow.native_io.iobase import NativeSink
-from apache_beam.runners.dataflow.native_io.iobase import NativeSinkWriter
-from apache_beam.runners.dataflow.native_io.iobase import NativeSource
-from apache_beam.runners.dataflow.native_io.iobase import ReaderPosition
-from apache_beam.runners.dataflow.native_io.iobase import ReaderProgress
-from apache_beam.runners.dataflow.native_io.iobase import 
_dict_printable_fields
-from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite
-from apache_beam.testing.test_pipeline import TestPipeline
-
-
-class TestHelperFunctions(unittest.TestCase):
-  def test_dict_printable_fields(self):
-    dict_object = {
-        'key_alpha': '1',
-        'key_beta': None,
-        'key_charlie': [],
-        'key_delta': 2.0,
-        'key_echo': 'skip_me',
-        'key_fox': 0
-    }
-    skip_fields = [
-        'key_echo',
-    ]
-    self.assertEqual(
-        sorted(_dict_printable_fields(dict_object, skip_fields)),
-        ["key_alpha='1'", 'key_delta=2.0', 'key_fox=0'])
-
-
-class TestNativeSource(unittest.TestCase):
-  def test_reader_method(self):
-    native_source = NativeSource()
-    self.assertRaises(NotImplementedError, native_source.reader)
-
-  def test_repr_method(self):
-    class FakeSource(NativeSource):
-      """A fake source modeled after BigQuerySource, which inherits from
-      NativeSource."""
-      def __init__(
-          self,
-          table=None,
-          dataset=None,
-          project=None,
-          query=None,
-          validate=False,
-          coder=None,
-          use_std_sql=False,
-          flatten_results=True):
-        self.validate = validate
-
-    fake_source = FakeSource()
-    self.assertEqual(fake_source.__repr__(), '<FakeSource validate=False>')
-
-
-class TestReaderProgress(unittest.TestCase):
-  def test_out_of_bounds_percent_complete(self):
-    with self.assertRaises(ValueError):
-      ReaderProgress(percent_complete=-0.1)
-    with self.assertRaises(ValueError):
-      ReaderProgress(percent_complete=1.1)
-
-  def test_position_property(self):
-    reader_progress = ReaderProgress(position=ReaderPosition())
-    self.assertEqual(type(reader_progress.position), ReaderPosition)
-
-  def test_percent_complete_property(self):
-    reader_progress = ReaderProgress(percent_complete=0.5)
-    self.assertEqual(reader_progress.percent_complete, 0.5)
-
-
-class TestReaderPosition(unittest.TestCase):
-  def test_invalid_concat_position_type(self):
-    with self.assertRaises(AssertionError):
-      ReaderPosition(concat_position=1)
-
-  def test_valid_concat_position_type(self):
-    ReaderPosition(concat_position=ConcatPosition(None, None))
-
-
-class TestConcatPosition(unittest.TestCase):
-  def test_invalid_position_type(self):
-    with self.assertRaises(AssertionError):
-      ConcatPosition(None, position=1)
-
-  def test_valid_position_type(self):
-    ConcatPosition(None, position=ReaderPosition())
-
-
-class TestDynamicSplitRequest(unittest.TestCase):
-  def test_invalid_progress_type(self):
-    with self.assertRaises(AssertionError):
-      DynamicSplitRequest(progress=1)
-
-  def test_valid_progress_type(self):
-    DynamicSplitRequest(progress=ReaderProgress())
-
-
-class TestDynamicSplitResultWithPosition(unittest.TestCase):
-  def test_invalid_stop_position_type(self):
-    with self.assertRaises(AssertionError):
-      DynamicSplitResultWithPosition(stop_position=1)
-
-  def test_valid_stop_position_type(self):
-    DynamicSplitResultWithPosition(stop_position=ReaderPosition())
-
-
-class TestNativeSink(unittest.TestCase):
-  def test_writer_method(self):
-    native_sink = NativeSink()
-    self.assertRaises(NotImplementedError, native_sink.writer)
-
-  def test_repr_method(self):
-    class FakeSink(NativeSink):
-      """A fake sink modeled after BigQuerySink, which inherits from
-      NativeSink."""
-      def __init__(
-          self,
-          validate=False,
-          dataset=None,
-          project=None,
-          schema=None,
-          create_disposition='create',
-          write_disposition=None,
-          coder=None):
-        self.validate = validate
-
-    fake_sink = FakeSink()
-    self.assertEqual(fake_sink.__repr__(), "<FakeSink ['validate=False']>")
-
-  def test_on_direct_runner(self):
-    class FakeSink(NativeSink):
-      """A fake sink outputing a number of elements."""
-      def __init__(self):
-        self.written_values = []
-        self.writer_instance = FakeSinkWriter(self.written_values)
-
-      def writer(self):
-        return self.writer_instance
-
-    class FakeSinkWriter(NativeSinkWriter):
-      """A fake sink writer for testing."""
-      def __init__(self, written_values):
-        self.written_values = written_values
-
-      def __enter__(self):
-        return self
-
-      def __exit__(self, *unused_args):
-        pass
-
-      def Write(self, value):
-        self.written_values.append(value)
-
-    with TestPipeline() as p:
-      sink = FakeSink()
-      p | Create(['a', 'b', 'c']) | _NativeWrite(sink)  # pylint: 
disable=expression-not-assigned
-
-    self.assertEqual(['a', 'b', 'c'], sorted(sink.written_values))
-
-
-class Test_NativeWrite(unittest.TestCase):
-  def setUp(self):
-    self.native_sink = NativeSink()
-    self.native_write = _NativeWrite(self.native_sink)
-
-  def test_expand_method_pcollection_errors(self):
-    with self.assertRaises(error.TransformError):
-      self.native_write.expand(None)
-    with self.assertRaises(error.TransformError):
-      pcoll = pvalue.PCollection(pipeline=None)
-      self.native_write.expand(pcoll)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 6466bc7752b..db53e4122bb 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -72,9 +72,9 @@ class SwitchingDirectRunner(PipelineRunner):
   def run_pipeline(self, pipeline, options):
 
     from apache_beam.pipeline import PipelineVisitor
-    from apache_beam.runners.dataflow.native_io.iobase import NativeSource
-    from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite
     from apache_beam.testing.test_stream import TestStream
+    from apache_beam.io.gcp.pubsub import ReadFromPubSub
+    from apache_beam.io.gcp.pubsub import WriteToPubSub
 
     class _FnApiRunnerSupportVisitor(PipelineVisitor):
       """Visitor determining if a Pipeline can be run on the FnApiRunner."""
@@ -83,18 +83,17 @@ class SwitchingDirectRunner(PipelineRunner):
         pipeline.visit(self)
         return self.supported_by_fnapi_runner
 
+      def enter_composite_transform(self, applied_ptransform):
+        # The FnApiRunner does not support streaming execution.
+        if isinstance(applied_ptransform.transform,
+                      (ReadFromPubSub, WriteToPubSub)):
+          self.supported_by_fnapi_runner = False
+
       def visit_transform(self, applied_ptransform):
         transform = applied_ptransform.transform
         # The FnApiRunner does not support streaming execution.
         if isinstance(transform, TestStream):
           self.supported_by_fnapi_runner = False
-        # The FnApiRunner does not support reads from NativeSources.
-        if (isinstance(transform, beam.io.Read) and
-            isinstance(transform.source, NativeSource)):
-          self.supported_by_fnapi_runner = False
-        # The FnApiRunner does not support the use of _NativeWrites.
-        if isinstance(transform, _NativeWrite):
-          self.supported_by_fnapi_runner = False
         if isinstance(transform, beam.ParDo):
           dofn = transform.dofn
           # The FnApiRunner does not support execution of CombineFns with
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index bfb27c4adc0..37004c7258a 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -39,7 +39,6 @@ from apache_beam.internal import pickler
 from apache_beam.runners import common
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
-from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # 
pylint: disable=protected-access
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _GroupByKeyOnly
 from apache_beam.runners.direct.direct_runner import 
_StreamingGroupAlsoByWindow
@@ -106,7 +105,6 @@ class TransformEvaluatorRegistry(object):
         _GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
         _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator,
-        _NativeWrite: _NativeWriteEvaluator,
         _TestStream: _TestStreamEvaluator,
         ProcessElements: _ProcessElementsEvaluator,
         _WatermarkController: _WatermarkControllerEvaluator,
@@ -172,11 +170,10 @@ class TransformEvaluatorRegistry(object):
     Returns:
       True if executor should execute applied_ptransform serially.
     """
-    if isinstance(applied_ptransform.transform,
-                  (_GroupByKeyOnly,
-                   _StreamingGroupByKeyOnly,
-                   _StreamingGroupAlsoByWindow,
-                   _NativeWrite)):
+    if isinstance(
+        applied_ptransform.transform,
+        (_GroupByKeyOnly, _StreamingGroupByKeyOnly,
+         _StreamingGroupAlsoByWindow)):
       return True
     elif (isinstance(applied_ptransform.transform, core.ParDo) and
           is_stateful_dofn(applied_ptransform.transform.dofn)):
@@ -1125,77 +1122,6 @@ class 
_StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator):
     return TransformResult(self, bundles, [], None, self.keyed_holds)
 
 
-class _NativeWriteEvaluator(_TransformEvaluator):
-  """TransformEvaluator for _NativeWrite transform."""
-
-  ELEMENTS_TAG = _ListStateTag('elements')
-
-  def __init__(
-      self,
-      evaluation_context,
-      applied_ptransform,
-      input_committed_bundle,
-      side_inputs):
-    assert not side_inputs
-    super().__init__(
-        evaluation_context,
-        applied_ptransform,
-        input_committed_bundle,
-        side_inputs)
-
-    assert applied_ptransform.transform.sink
-    self._sink = applied_ptransform.transform.sink
-
-  @property
-  def _is_final_bundle(self):
-    return (
-        self._execution_context.watermarks.input_watermark ==
-        WatermarkManager.WATERMARK_POS_INF)
-
-  @property
-  def _has_already_produced_output(self):
-    return (
-        self._execution_context.watermarks.output_watermark ==
-        WatermarkManager.WATERMARK_POS_INF)
-
-  def start_bundle(self):
-    self.global_state = self._step_context.get_keyed_state(None)
-
-  def process_timer(self, timer_firing):
-    # We do not need to emit a KeyedWorkItem to process_element().
-    pass
-
-  def process_element(self, element):
-    self.global_state.add_state(
-        None, _NativeWriteEvaluator.ELEMENTS_TAG, element)
-
-  def finish_bundle(self):
-    # finish_bundle will append incoming bundles in memory until all the 
bundles
-    # carrying data is processed. This is done to produce only a single output
-    # shard (some tests depends on this behavior). It is possible to have
-    # incoming empty bundles after the output is produced, these bundles will 
be
-    # ignored and would not generate additional output files.
-    # TODO(altay): Do not wait until the last bundle to write in a single 
shard.
-    if self._is_final_bundle:
-      elements = self.global_state.get_state(
-          None, _NativeWriteEvaluator.ELEMENTS_TAG)
-      if self._has_already_produced_output:
-        # Ignore empty bundles that arrive after the output is produced.
-        assert elements == []
-      else:
-        self._sink.pipeline_options = self._evaluation_context.pipeline_options
-        with self._sink.writer() as writer:
-          for v in elements:
-            writer.Write(v.value)
-      hold = WatermarkManager.WATERMARK_POS_INF
-    else:
-      hold = WatermarkManager.WATERMARK_NEG_INF
-      self.global_state.set_timer(
-          None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
-
-    return TransformResult(self, [], [], None, {None: hold})
-
-
 class _ProcessElementsEvaluator(_TransformEvaluator):
   """An evaluator for sdf_direct_runner.ProcessElements transform."""
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
index 8b81c9f17ac..8072089f2ac 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py
@@ -20,7 +20,6 @@
 
 import unittest
 from typing import Dict
-from typing import List
 
 import apache_beam as beam
 from apache_beam.coders.coders import FastPrimitivesCoder


Reply via email to