Remove support for NativeSinks from the Python DirectRunner

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f2e30886
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f2e30886
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f2e30886

Branch: refs/heads/gearpump-runner
Commit: f2e3088633fef10f19bfd11ff9b508930916a740
Parents: 32f22b7
Author: Charles Chen <c...@google.com>
Authored: Wed Jun 7 17:00:57 2017 -0700
Committer: Charles Chen <c...@google.com>
Committed: Wed Jun 7 17:01:33 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/transform_evaluator.py       | 62 +-------------------
 1 file changed, 1 insertion(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f2e30886/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index b1cb626..0fec8b8 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -29,7 +29,6 @@ from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
 from apache_beam.runners.direct.transform_result import TransformResult
-from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # 
pylint: disable=protected-access
 from apache_beam.transforms import core
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
@@ -54,7 +53,6 @@ class TransformEvaluatorRegistry(object):
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
-        _NativeWrite: _NativeWriteEvaluator,
     }
 
   def for_application(
@@ -98,8 +96,7 @@ class TransformEvaluatorRegistry(object):
     Returns:
       True if executor should execute applied_ptransform serially.
     """
-    return isinstance(applied_ptransform.transform,
-                      (core._GroupByKeyOnly, _NativeWrite))
+    return isinstance(applied_ptransform.transform, core._GroupByKeyOnly)
 
 
 class _TransformEvaluator(object):
@@ -403,60 +400,3 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
 
     return TransformResult(
         self._applied_ptransform, bundles, state, None, None, hold)
-
-
-class _NativeWriteEvaluator(_TransformEvaluator):
-  """TransformEvaluator for _NativeWrite transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    super(_NativeWriteEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-    assert applied_ptransform.transform.sink
-    self._sink = applied_ptransform.transform.sink
-
-  @property
-  def _is_final_bundle(self):
-    return (self._execution_context.watermarks.input_watermark
-            == WatermarkManager.WATERMARK_POS_INF)
-
-  @property
-  def _has_already_produced_output(self):
-    return (self._execution_context.watermarks.output_watermark
-            == WatermarkManager.WATERMARK_POS_INF)
-
-  def start_bundle(self):
-    # state: [values]
-    self.state = (self._execution_context.existing_state
-                  if self._execution_context.existing_state else [])
-
-  def process_element(self, element):
-    self.state.append(element)
-
-  def finish_bundle(self):
-    # finish_bundle will append incoming bundles in memory until all the 
bundles
-    # carrying data is processed. This is done to produce only a single output
-    # shard (some tests depends on this behavior). It is possible to have
-    # incoming empty bundles after the output is produced, these bundles will 
be
-    # ignored and would not generate additional output files.
-    # TODO(altay): Do not wait until the last bundle to write in a single 
shard.
-    if self._is_final_bundle:
-      if self._has_already_produced_output:
-        # Ignore empty bundles that arrive after the output is produced.
-        assert self.state == []
-      else:
-        self._sink.pipeline_options = self._evaluation_context.pipeline_options
-        with self._sink.writer() as writer:
-          for v in self.state:
-            writer.Write(v.value)
-      state = None
-      hold = WatermarkManager.WATERMARK_POS_INF
-    else:
-      state = self.state
-      hold = WatermarkManager.WATERMARK_NEG_INF
-
-    return TransformResult(
-        self._applied_ptransform, [], state, None, None, hold)

Reply via email to