Remove support for NativeSinks from the Python DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f2e30886 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f2e30886 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f2e30886 Branch: refs/heads/gearpump-runner Commit: f2e3088633fef10f19bfd11ff9b508930916a740 Parents: 32f22b7 Author: Charles Chen <c...@google.com> Authored: Wed Jun 7 17:00:57 2017 -0700 Committer: Charles Chen <c...@google.com> Committed: Wed Jun 7 17:01:33 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/transform_evaluator.py | 62 +------------------- 1 file changed, 1 insertion(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f2e30886/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index b1cb626..0fec8b8 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -29,7 +29,6 @@ from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.transform_result import TransformResult -from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue @@ -54,7 +53,6 @@ class TransformEvaluatorRegistry(object): core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, - _NativeWrite: _NativeWriteEvaluator, } def for_application( @@ -98,8 +96,7 @@ class TransformEvaluatorRegistry(object): Returns: True if executor should execute applied_ptransform serially. """ - return isinstance(applied_ptransform.transform, - (core._GroupByKeyOnly, _NativeWrite)) + return isinstance(applied_ptransform.transform, core._GroupByKeyOnly) class _TransformEvaluator(object): @@ -403,60 +400,3 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): return TransformResult( self._applied_ptransform, bundles, state, None, None, hold) - - -class _NativeWriteEvaluator(_TransformEvaluator): - """TransformEvaluator for _NativeWrite transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - super(_NativeWriteEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - assert applied_ptransform.transform.sink - self._sink = applied_ptransform.transform.sink - - @property - def _is_final_bundle(self): - return (self._execution_context.watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF) - - @property - def _has_already_produced_output(self): - return (self._execution_context.watermarks.output_watermark - == WatermarkManager.WATERMARK_POS_INF) - - def start_bundle(self): - # state: [values] - self.state = (self._execution_context.existing_state - if self._execution_context.existing_state else []) - - def process_element(self, element): - self.state.append(element) - - def finish_bundle(self): - # finish_bundle will append incoming bundles in memory until all the bundles - # carrying data is processed. This is done to produce only a single output - # shard (some tests depends on this behavior). It is possible to have - # incoming empty bundles after the output is produced, these bundles will be - # ignored and would not generate additional output files. - # TODO(altay): Do not wait until the last bundle to write in a single shard. - if self._is_final_bundle: - if self._has_already_produced_output: - # Ignore empty bundles that arrive after the output is produced. - assert self.state == [] - else: - self._sink.pipeline_options = self._evaluation_context.pipeline_options - with self._sink.writer() as writer: - for v in self.state: - writer.Write(v.value) - state = None - hold = WatermarkManager.WATERMARK_POS_INF - else: - state = self.state - hold = WatermarkManager.WATERMARK_NEG_INF - - return TransformResult( - self._applied_ptransform, [], state, None, None, hold)