This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b5486cf37a3 Remove obsolete native (dataflow) io from Beam Python. (#27444) b5486cf37a3 is described below commit b5486cf37a31863da9ce4837b6547628031d1bd3 Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Wed Jul 12 13:16:36 2023 -0700 Remove obsolete native (dataflow) io from Beam Python. (#27444) This was only needed for Runner v1. --- sdks/python/apache_beam/io/gcp/pubsub.py | 26 +- sdks/python/apache_beam/io/iobase.py | 43 +-- sdks/python/apache_beam/pipeline_test.py | 53 +--- .../runners/dataflow/native_io/__init__.py | 16 - .../runners/dataflow/native_io/iobase.py | 342 --------------------- .../runners/dataflow/native_io/iobase_test.py | 203 ------------ .../apache_beam/runners/direct/direct_runner.py | 17 +- .../runners/direct/transform_evaluator.py | 82 +---- .../runners/worker/bundle_processor_test.py | 1 - 9 files changed, 45 insertions(+), 738 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 8cee8acfebb..af58006d6e7 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -39,9 +39,9 @@ from typing import Optional from typing import Tuple from apache_beam import coders +from apache_beam.io import iobase from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write -from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms import Flatten from apache_beam.transforms import Map from apache_beam.transforms import PTransform @@ -261,6 +261,7 @@ class ReadFromPubSub(PTransform): timestamp_attribute=timestamp_attribute) def expand(self, pvalue): + # TODO(BEAM-27443): Apply a proper transform rather than Read. pcoll = pvalue.pipeline | Read(self._source) pcoll.element_type = bytes if self.with_attributes: @@ -423,7 +424,8 @@ def parse_subscription(full_subscription): return project, subscription_name -class _PubSubSource(dataflow_io.NativeSource): +# TODO(BEAM-27443): Remove (or repurpose as a proper PTransform). +class _PubSubSource(iobase.SourceBase): """Source for a Cloud Pub/Sub topic or subscription. This ``NativeSource`` is overridden by a native Pubsub implementation. @@ -460,11 +462,6 @@ class _PubSubSource(dataflow_io.NativeSource): if subscription: self.project, self.subscription_name = parse_subscription(subscription) - @property - def format(self): - """Source format name required for remote execution.""" - return 'pubsub' - def display_data(self): return { 'id_label': DisplayDataItem(self.id_label, @@ -480,14 +477,15 @@ class _PubSubSource(dataflow_io.NativeSource): label='Timestamp Attribute').drop_if_none(), } - def reader(self): - raise NotImplementedError + def default_output_coder(self): + return self.coder def is_bounded(self): return False -class _PubSubSink(dataflow_io.NativeSink): +# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform. +class _PubSubSink(object): """Sink for a Cloud Pub/Sub topic. This ``NativeSource`` is overridden by a native Pubsub implementation. @@ -505,14 +503,6 @@ class _PubSubSink(dataflow_io.NativeSink): self.project, self.topic_name = parse_topic(topic) - @property - def format(self): - """Sink format name required for remote execution.""" - return 'pubsub' - - def writer(self): - raise NotImplementedError - class PubSubSourceDescriptor(NamedTuple): """A PubSub source descriptor for ``MultipleReadFromPubSub``` diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 6d75d520af5..e15205ead4d 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -101,6 +101,9 @@ class SourceBase(HasDisplayData, urns.RunnerApiFn): """ urns.RunnerApiFn.register_pickle_urn(python_urns.PICKLED_SOURCE) + def default_output_coder(self): + raise NotImplementedError + def is_bounded(self): # type: () -> bool raise NotImplementedError @@ -923,11 +926,8 @@ class Read(ptransform.PTransform): def _infer_output_coder(self, input_type=None, input_coder=None): # type: (...) -> Optional[coders.Coder] - from apache_beam.runners.dataflow.native_io import iobase as dataflow_io - if isinstance(self.source, BoundedSource): + if isinstance(self.source, SourceBase): return self.source.default_output_coder() - elif isinstance(self.source, dataflow_io.NativeSource): - return self.source.coder else: return None @@ -941,18 +941,17 @@ class Read(ptransform.PTransform): self, context: PipelineContext, ) -> Tuple[str, Any]: - from apache_beam.runners.dataflow.native_io import iobase as dataflow_io - if isinstance(self.source, (BoundedSource, dataflow_io.NativeSource)): - from apache_beam.io.gcp.pubsub import _PubSubSource - if isinstance(self.source, _PubSubSource): - return ( - common_urns.composites.PUBSUB_READ.urn, - beam_runner_api_pb2.PubSubReadPayload( - topic=self.source.full_topic, - subscription=self.source.full_subscription, - timestamp_attribute=self.source.timestamp_attribute, - with_attributes=self.source.with_attributes, - id_attribute=self.source.id_label)) + from apache_beam.io.gcp.pubsub import _PubSubSource + if isinstance(self.source, _PubSubSource): + return ( + common_urns.composites.PUBSUB_READ.urn, + beam_runner_api_pb2.PubSubReadPayload( + topic=self.source.full_topic, + subscription=self.source.full_subscription, + timestamp_attribute=self.source.timestamp_attribute, + with_attributes=self.source.with_attributes, + id_attribute=self.source.id_label)) + if isinstance(self.source, BoundedSource): return ( common_urns.deprecated_primitives.READ.urn, beam_runner_api_pb2.ReadPayload( @@ -976,6 +975,7 @@ class Read(ptransform.PTransform): if transform.spec.urn == common_urns.composites.PUBSUB_READ.urn: assert isinstance(payload, beam_runner_api_pb2.PubSubReadPayload) # Importing locally to prevent circular dependencies. + # TODO(BEAM-27443): Remove the need for this. from apache_beam.io.gcp.pubsub import _PubSubSource source = _PubSubSource( topic=payload.topic or None, @@ -1015,6 +1015,7 @@ ptransform.PTransform.register_urn( Read._from_runner_api_parameter_read, ) +# TODO(BEAM-27443): Remove. ptransform.PTransform.register_urn( common_urns.composites.PUBSUB_READ.urn, beam_runner_api_pb2.PubSubReadPayload, @@ -1065,10 +1066,11 @@ class Write(ptransform.PTransform): return {'sink': self.sink.__class__, 'sink_dd': self.sink} def expand(self, pcoll): - from apache_beam.runners.dataflow.native_io import iobase as dataflow_io - if isinstance(self.sink, dataflow_io.NativeSink): - # A native sink - return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink) + # Importing locally to prevent circular dependencies. + from apache_beam.io.gcp.pubsub import _PubSubSink + if isinstance(self.sink, _PubSubSink): + # TODO(BEAM-27443): Remove the need for special casing here. + return pvalue.PDone(pcoll.pipeline) elif isinstance(self.sink, Sink): # A custom sink return pcoll | WriteImpl(self.sink) @@ -1084,6 +1086,7 @@ class Write(ptransform.PTransform): self, context: PipelineContext, ) -> Tuple[str, Any]: + # TODO(BEAM-27443): Remove the need for special casing here. # Importing locally to prevent circular dependencies. from apache_beam.io.gcp.pubsub import _PubSubSink if isinstance(self.sink, _PubSubSink): diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 18ab0f091aa..c9ac4ce4c13 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -30,7 +30,7 @@ import apache_beam as beam from apache_beam import typehints from apache_beam.coders import BytesCoder from apache_beam.io import Read -from apache_beam.metrics import Metrics +from apache_beam.io.iobase import SourceBase from apache_beam.options.pipeline_options import PortableOptions from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions @@ -40,7 +40,6 @@ from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import TaggedOutput -from apache_beam.runners.dataflow.native_io.iobase import NativeSource from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -61,39 +60,9 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import MIN_TIMESTAMP -# TODO(BEAM-1555): Test is failing on the service, with FakeSource. - -class FakeSource(NativeSource): - """Fake source returning a fixed list of values.""" - class _Reader(object): - def __init__(self, vals): - self._vals = vals - self._output_counter = Metrics.counter('main', 'outputs') - - def __enter__(self): - return self - - def __exit__(self, exception_type, exception_value, traceback): - pass - - def __iter__(self): - for v in self._vals: - self._output_counter.inc() - yield v - - def __init__(self, vals): - self._vals = vals - - def reader(self): - return FakeSource._Reader(self._vals) - - -class FakeUnboundedSource(NativeSource): +class FakeUnboundedSource(SourceBase): """Fake unbounded source. Does not work at runtime""" - def reader(self): - return None - def is_bounded(self): return False @@ -259,24 +228,6 @@ class PipelineTest(unittest.TestCase): pcoll = pipeline | 'label' >> Create([[1, 2, 3]]) assert_that(pcoll, equal_to([[1, 2, 3]])) - # TODO(BEAM-1555): Test is failing on the service, with FakeSource. - # @pytest.mark.it_validatesrunner - def test_metrics_in_fake_source(self): - pipeline = TestPipeline() - pcoll = pipeline | Read(FakeSource([1, 2, 3, 4, 5, 6])) - assert_that(pcoll, equal_to([1, 2, 3, 4, 5, 6])) - res = pipeline.run() - metric_results = res.metrics().query() - outputs_counter = metric_results['counters'][0] - self.assertEqual(outputs_counter.key.step, 'Read') - self.assertEqual(outputs_counter.key.metric.name, 'outputs') - self.assertEqual(outputs_counter.committed, 6) - - def test_fake_read(self): - with TestPipeline() as pipeline: - pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) - assert_that(pcoll, equal_to([1, 2, 3])) - def test_visit_entire_graph(self): pipeline = Pipeline() pcoll1 = pipeline | 'pcoll' >> beam.Impulse() diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py b/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py deleted file mode 100644 index cce3acad34a..00000000000 --- a/sdks/python/apache_beam/runners/dataflow/native_io/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py deleted file mode 100644 index 3d1afe54690..00000000000 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ /dev/null @@ -1,342 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Dataflow native sources and sinks. - -For internal use only; no backwards-compatibility guarantees. -""" - -# pytype: skip-file - -import logging -from typing import TYPE_CHECKING -from typing import Optional - -from apache_beam import pvalue -from apache_beam.io import iobase -from apache_beam.transforms import ptransform -from apache_beam.transforms.display import HasDisplayData - -if TYPE_CHECKING: - from apache_beam import coders - -_LOGGER = logging.getLogger(__name__) - - -def _dict_printable_fields(dict_object, skip_fields): - """Returns a list of strings for the interesting fields of a dict.""" - return [ - '%s=%r' % (name, value) for name, - value in dict_object.items() - # want to output value 0 but not None nor [] - if (value or value == 0) and name not in skip_fields - ] - - -_minor_fields = [ - 'coder', - 'key_coder', - 'value_coder', - 'config_bytes', - 'elements', - 'append_trailing_newlines', - 'strip_trailing_newlines', - 'compression_type' -] - - -class NativeSource(iobase.SourceBase): - """A source implemented by Dataflow service. - - This class is to be only inherited by sources natively implemented by Cloud - Dataflow service, hence should not be sub-classed by users. - - This class is deprecated and should not be used to define new sources. - """ - coder = None # type: Optional[coders.Coder] - - def reader(self): - """Returns a NativeSourceReader instance associated with this source.""" - raise NotImplementedError - - def is_bounded(self): - return True - - def __repr__(self): - return '<{name} {vals}>'.format( - name=self.__class__.__name__, - vals=', '.join(_dict_printable_fields(self.__dict__, _minor_fields))) - - -class NativeSourceReader(object): - """A reader for a source implemented by Dataflow service.""" - def __enter__(self): - """Opens everything necessary for a reader to function properly.""" - raise NotImplementedError - - def __exit__(self, exception_type, exception_value, traceback): - """Cleans up after a reader executed.""" - raise NotImplementedError - - def __iter__(self): - """Returns an iterator over all the records of the source.""" - raise NotImplementedError - - @property - def returns_windowed_values(self): - """Returns whether this reader returns windowed values.""" - return False - - def get_progress(self): - """Returns a representation of how far the reader has read. - - Returns: - A SourceReaderProgress object that gives the current progress of the - reader. - """ - - def request_dynamic_split(self, dynamic_split_request): - """Attempts to split the input in two parts. - - The two parts are named the "primary" part and the "residual" part. The - current 'NativeSourceReader' keeps processing the primary part, while the - residual part will be processed elsewhere (e.g. perhaps on a different - worker). - - The primary and residual parts, if concatenated, must represent the - same input as the current input of this 'NativeSourceReader' before this - call. - - The boundary between the primary part and the residual part is - specified in a framework-specific way using 'DynamicSplitRequest' e.g., - if the framework supports the notion of positions, it might be a - position at which the input is asked to split itself (which is not - necessarily the same position at which it *will* split itself); it - might be an approximate fraction of input, or something else. - - This function returns a 'DynamicSplitResult', which encodes, in a - framework-specific way, the information sufficient to construct a - description of the resulting primary and residual inputs. For example, it - might, again, be a position demarcating these parts, or it might be a pair - of fully-specified input descriptions, or something else. - - After a successful call to 'request_dynamic_split()', subsequent calls - should be interpreted relative to the new primary. - - Args: - dynamic_split_request: A 'DynamicSplitRequest' describing the split - request. - - Returns: - 'None' if the 'DynamicSplitRequest' cannot be honored (in that - case the input represented by this 'NativeSourceReader' stays the same), - or a 'DynamicSplitResult' describing how the input was split into a - primary and residual part. - """ - _LOGGER.debug( - 'SourceReader %r does not support dynamic splitting. Ignoring dynamic ' - 'split request: %r', - self, - dynamic_split_request) - - -class ReaderProgress(object): - """A representation of how far a NativeSourceReader has read.""" - def __init__( - self, - position=None, - percent_complete=None, - remaining_time=None, - consumed_split_points=None, - remaining_split_points=None): - - self._position = position - - if percent_complete is not None: - percent_complete = float(percent_complete) - if percent_complete < 0 or percent_complete > 1: - raise ValueError( - 'The percent_complete argument was %f. Must be in range [0, 1].' % - percent_complete) - self._percent_complete = percent_complete - - self._remaining_time = remaining_time - self._consumed_split_points = consumed_split_points - self._remaining_split_points = remaining_split_points - - @property - def position(self): - """Returns progress, represented as a ReaderPosition object.""" - return self._position - - @property - def percent_complete(self): - """Returns progress, represented as a percentage of total work. - - Progress range from 0.0 (beginning, nothing complete) to 1.0 (end of the - work range, entire WorkItem complete). - - Returns: - Progress represented as a percentage of total work. - """ - return self._percent_complete - - @property - def remaining_time(self): - """Returns progress, represented as an estimated time remaining.""" - return self._remaining_time - - @property - def consumed_split_points(self): - return self._consumed_split_points - - @property - def remaining_split_points(self): - return self._remaining_split_points - - -class ReaderPosition(object): - """A representation of position in an iteration of a 'NativeSourceReader'.""" - def __init__( - self, - end=None, - key=None, - byte_offset=None, - record_index=None, - shuffle_position=None, - concat_position=None): - """Initializes ReaderPosition. - - A ReaderPosition may get instantiated for one of these position types. Only - one of these should be specified. - - Args: - end: position is past all other positions. For example, this may be used - to represent the end position of an unbounded range. - key: position is a string key. - byte_offset: position is a byte offset. - record_index: position is a record index - shuffle_position: position is a base64 encoded shuffle position. - concat_position: position is a 'ConcatPosition'. - """ - - self.end = end - self.key = key - self.byte_offset = byte_offset - self.record_index = record_index - self.shuffle_position = shuffle_position - - if concat_position is not None: - assert isinstance(concat_position, ConcatPosition) - self.concat_position = concat_position - - -class ConcatPosition(object): - """A position that encapsulate an inner position and an index. - - This is used to represent the position of a source that encapsulate several - other sources. - """ - def __init__(self, index, position): - """Initializes ConcatPosition. - - Args: - index: index of the source currently being read. - position: inner position within the source currently being read. - """ - - if position is not None: - assert isinstance(position, ReaderPosition) - self.index = index - self.position = position - - -class DynamicSplitRequest(object): - """Specifies how 'NativeSourceReader.request_dynamic_split' should split. - """ - def __init__(self, progress): - assert isinstance(progress, ReaderProgress) - self.progress = progress - - -class DynamicSplitResult(object): - pass - - -class DynamicSplitResultWithPosition(DynamicSplitResult): - def __init__(self, stop_position): - assert isinstance(stop_position, ReaderPosition) - self.stop_position = stop_position - - -class NativeSink(HasDisplayData): - """A sink implemented by Dataflow service. - - This class is to be only inherited by sinks natively implemented by Cloud - Dataflow service, hence should not be sub-classed by users. - """ - def writer(self): - """Returns a SinkWriter for this source.""" - raise NotImplementedError - - def __repr__(self): - return '<{name} {vals}>'.format( - name=self.__class__.__name__, - vals=_dict_printable_fields(self.__dict__, _minor_fields)) - - -class NativeSinkWriter(object): - """A writer for a sink implemented by Dataflow service.""" - def __enter__(self): - """Opens everything necessary for a writer to function properly.""" - raise NotImplementedError - - def __exit__(self, exception_type, exception_value, traceback): - """Cleans up after a writer executed.""" - raise NotImplementedError - - @property - def takes_windowed_values(self): - """Returns whether this writer takes windowed values.""" - return False - - def Write(self, o): # pylint: disable=invalid-name - """Writes a record to the sink associated with this writer.""" - raise NotImplementedError - - -class _NativeWrite(ptransform.PTransform): - """A PTransform for writing to a Dataflow native sink. - - These are sinks that are implemented natively by the Dataflow service - and hence should not be updated by users. These sinks are processed - using a Dataflow native write transform. - - Applying this transform results in a ``pvalue.PDone``. - """ - def __init__(self, sink): - """Initializes a Write transform. - - Args: - sink: Sink to use for the write - """ - super().__init__() - self.sink = sink - - def expand(self, pcoll): - self._check_pcollection(pcoll) - return pvalue.PDone(pcoll.pipeline) diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py deleted file mode 100644 index 5e72ca555b6..00000000000 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py +++ /dev/null @@ -1,203 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Tests corresponding to Dataflow's iobase module.""" - -# pytype: skip-file - -import unittest - -from apache_beam import Create -from apache_beam import error -from apache_beam import pvalue -from apache_beam.runners.dataflow.native_io.iobase import ConcatPosition -from apache_beam.runners.dataflow.native_io.iobase import DynamicSplitRequest -from apache_beam.runners.dataflow.native_io.iobase import DynamicSplitResultWithPosition -from apache_beam.runners.dataflow.native_io.iobase import NativeSink -from apache_beam.runners.dataflow.native_io.iobase import NativeSinkWriter -from apache_beam.runners.dataflow.native_io.iobase import NativeSource -from apache_beam.runners.dataflow.native_io.iobase import ReaderPosition -from apache_beam.runners.dataflow.native_io.iobase import ReaderProgress -from apache_beam.runners.dataflow.native_io.iobase import _dict_printable_fields -from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite -from apache_beam.testing.test_pipeline import TestPipeline - - -class TestHelperFunctions(unittest.TestCase): - def test_dict_printable_fields(self): - dict_object = { - 'key_alpha': '1', - 'key_beta': None, - 'key_charlie': [], - 'key_delta': 2.0, - 'key_echo': 'skip_me', - 'key_fox': 0 - } - skip_fields = [ - 'key_echo', - ] - self.assertEqual( - sorted(_dict_printable_fields(dict_object, skip_fields)), - ["key_alpha='1'", 'key_delta=2.0', 'key_fox=0']) - - -class TestNativeSource(unittest.TestCase): - def test_reader_method(self): - native_source = NativeSource() - self.assertRaises(NotImplementedError, native_source.reader) - - def test_repr_method(self): - class FakeSource(NativeSource): - """A fake source modeled after BigQuerySource, which inherits from - NativeSource.""" - def __init__( - self, - table=None, - dataset=None, - project=None, - query=None, - validate=False, - coder=None, - use_std_sql=False, - flatten_results=True): - self.validate = validate - - fake_source = FakeSource() - self.assertEqual(fake_source.__repr__(), '<FakeSource validate=False>') - - -class TestReaderProgress(unittest.TestCase): - def test_out_of_bounds_percent_complete(self): - with self.assertRaises(ValueError): - ReaderProgress(percent_complete=-0.1) - with self.assertRaises(ValueError): - ReaderProgress(percent_complete=1.1) - - def test_position_property(self): - reader_progress = ReaderProgress(position=ReaderPosition()) - self.assertEqual(type(reader_progress.position), ReaderPosition) - - def test_percent_complete_property(self): - reader_progress = ReaderProgress(percent_complete=0.5) - self.assertEqual(reader_progress.percent_complete, 0.5) - - -class TestReaderPosition(unittest.TestCase): - def test_invalid_concat_position_type(self): - with self.assertRaises(AssertionError): - ReaderPosition(concat_position=1) - - def test_valid_concat_position_type(self): - ReaderPosition(concat_position=ConcatPosition(None, None)) - - -class TestConcatPosition(unittest.TestCase): - def test_invalid_position_type(self): - with self.assertRaises(AssertionError): - ConcatPosition(None, position=1) - - def test_valid_position_type(self): - ConcatPosition(None, position=ReaderPosition()) - - -class TestDynamicSplitRequest(unittest.TestCase): - def test_invalid_progress_type(self): - with self.assertRaises(AssertionError): - DynamicSplitRequest(progress=1) - - def test_valid_progress_type(self): - DynamicSplitRequest(progress=ReaderProgress()) - - -class TestDynamicSplitResultWithPosition(unittest.TestCase): - def test_invalid_stop_position_type(self): - with self.assertRaises(AssertionError): - DynamicSplitResultWithPosition(stop_position=1) - - def test_valid_stop_position_type(self): - DynamicSplitResultWithPosition(stop_position=ReaderPosition()) - - -class TestNativeSink(unittest.TestCase): - def test_writer_method(self): - native_sink = NativeSink() - self.assertRaises(NotImplementedError, native_sink.writer) - - def test_repr_method(self): - class FakeSink(NativeSink): - """A fake sink modeled after BigQuerySink, which inherits from - NativeSink.""" - def __init__( - self, - validate=False, - dataset=None, - project=None, - schema=None, - create_disposition='create', - write_disposition=None, - coder=None): - self.validate = validate - - fake_sink = FakeSink() - self.assertEqual(fake_sink.__repr__(), "<FakeSink ['validate=False']>") - - def test_on_direct_runner(self): - class FakeSink(NativeSink): - """A fake sink outputing a number of elements.""" - def __init__(self): - self.written_values = [] - self.writer_instance = FakeSinkWriter(self.written_values) - - def writer(self): - return self.writer_instance - - class FakeSinkWriter(NativeSinkWriter): - """A fake sink writer for testing.""" - def __init__(self, written_values): - self.written_values = written_values - - def __enter__(self): - return self - - def __exit__(self, *unused_args): - pass - - def Write(self, value): - self.written_values.append(value) - - with TestPipeline() as p: - sink = FakeSink() - p | Create(['a', 'b', 'c']) | _NativeWrite(sink) # pylint: disable=expression-not-assigned - - self.assertEqual(['a', 'b', 'c'], sorted(sink.written_values)) - - -class Test_NativeWrite(unittest.TestCase): - def setUp(self): - self.native_sink = NativeSink() - self.native_write = _NativeWrite(self.native_sink) - - def test_expand_method_pcollection_errors(self): - with self.assertRaises(error.TransformError): - self.native_write.expand(None) - with self.assertRaises(error.TransformError): - pcoll = pvalue.PCollection(pipeline=None) - self.native_write.expand(pcoll) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 6466bc7752b..db53e4122bb 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -72,9 +72,9 @@ class SwitchingDirectRunner(PipelineRunner): def run_pipeline(self, pipeline, options): from apache_beam.pipeline import PipelineVisitor - from apache_beam.runners.dataflow.native_io.iobase import NativeSource - from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite from apache_beam.testing.test_stream import TestStream + from apache_beam.io.gcp.pubsub import ReadFromPubSub + from apache_beam.io.gcp.pubsub import WriteToPubSub class _FnApiRunnerSupportVisitor(PipelineVisitor): """Visitor determining if a Pipeline can be run on the FnApiRunner.""" @@ -83,18 +83,17 @@ class SwitchingDirectRunner(PipelineRunner): pipeline.visit(self) return self.supported_by_fnapi_runner + def enter_composite_transform(self, applied_ptransform): + # The FnApiRunner does not support streaming execution. + if isinstance(applied_ptransform.transform, + (ReadFromPubSub, WriteToPubSub)): + self.supported_by_fnapi_runner = False + def visit_transform(self, applied_ptransform): transform = applied_ptransform.transform # The FnApiRunner does not support streaming execution. if isinstance(transform, TestStream): self.supported_by_fnapi_runner = False - # The FnApiRunner does not support reads from NativeSources. - if (isinstance(transform, beam.io.Read) and - isinstance(transform.source, NativeSource)): - self.supported_by_fnapi_runner = False - # The FnApiRunner does not support the use of _NativeWrites. - if isinstance(transform, _NativeWrite): - self.supported_by_fnapi_runner = False if isinstance(transform, beam.ParDo): dofn = transform.dofn # The FnApiRunner does not support execution of CombineFns with diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index bfb27c4adc0..37004c7258a 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -39,7 +39,6 @@ from apache_beam.internal import pickler from apache_beam.runners import common from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState -from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub from apache_beam.runners.direct.direct_runner import _GroupByKeyOnly from apache_beam.runners.direct.direct_runner import _StreamingGroupAlsoByWindow @@ -106,7 +105,6 @@ class TransformEvaluatorRegistry(object): _GroupByKeyOnly: _GroupByKeyOnlyEvaluator, _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator, _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator, - _NativeWrite: _NativeWriteEvaluator, _TestStream: _TestStreamEvaluator, ProcessElements: _ProcessElementsEvaluator, _WatermarkController: _WatermarkControllerEvaluator, @@ -172,11 +170,10 @@ class TransformEvaluatorRegistry(object): Returns: True if executor should execute applied_ptransform serially. """ - if isinstance(applied_ptransform.transform, - (_GroupByKeyOnly, - _StreamingGroupByKeyOnly, - _StreamingGroupAlsoByWindow, - _NativeWrite)): + if isinstance( + applied_ptransform.transform, + (_GroupByKeyOnly, _StreamingGroupByKeyOnly, + _StreamingGroupAlsoByWindow)): return True elif (isinstance(applied_ptransform.transform, core.ParDo) and is_stateful_dofn(applied_ptransform.transform.dofn)): @@ -1125,77 +1122,6 @@ class _StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator): return TransformResult(self, bundles, [], None, self.keyed_holds) -class _NativeWriteEvaluator(_TransformEvaluator): - """TransformEvaluator for _NativeWrite transform.""" - - ELEMENTS_TAG = _ListStateTag('elements') - - def __init__( - self, - evaluation_context, - applied_ptransform, - input_committed_bundle, - side_inputs): - assert not side_inputs - super().__init__( - evaluation_context, - applied_ptransform, - input_committed_bundle, - side_inputs) - - assert applied_ptransform.transform.sink - self._sink = applied_ptransform.transform.sink - - @property - def _is_final_bundle(self): - return ( - self._execution_context.watermarks.input_watermark == - WatermarkManager.WATERMARK_POS_INF) - - @property - def _has_already_produced_output(self): - return ( - self._execution_context.watermarks.output_watermark == - WatermarkManager.WATERMARK_POS_INF) - - def start_bundle(self): - self.global_state = self._step_context.get_keyed_state(None) - - def process_timer(self, timer_firing): - # We do not need to emit a KeyedWorkItem to process_element(). - pass - - def process_element(self, element): - self.global_state.add_state( - None, _NativeWriteEvaluator.ELEMENTS_TAG, element) - - def finish_bundle(self): - # finish_bundle will append incoming bundles in memory until all the bundles - # carrying data is processed. This is done to produce only a single output - # shard (some tests depends on this behavior). It is possible to have - # incoming empty bundles after the output is produced, these bundles will be - # ignored and would not generate additional output files. - # TODO(altay): Do not wait until the last bundle to write in a single shard. - if self._is_final_bundle: - elements = self.global_state.get_state( - None, _NativeWriteEvaluator.ELEMENTS_TAG) - if self._has_already_produced_output: - # Ignore empty bundles that arrive after the output is produced. - assert elements == [] - else: - self._sink.pipeline_options = self._evaluation_context.pipeline_options - with self._sink.writer() as writer: - for v in elements: - writer.Write(v.value) - hold = WatermarkManager.WATERMARK_POS_INF - else: - hold = WatermarkManager.WATERMARK_NEG_INF - self.global_state.set_timer( - None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) - - return TransformResult(self, [], [], None, {None: hold}) - - class _ProcessElementsEvaluator(_TransformEvaluator): """An evaluator for sdf_direct_runner.ProcessElements transform.""" diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py index 8b81c9f17ac..8072089f2ac 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py @@ -20,7 +20,6 @@ import unittest from typing import Dict -from typing import List import apache_beam as beam from apache_beam.coders.coders import FastPrimitivesCoder