Renames InprocessPipelineRunner to DirectPipelineRunner and removes the
existing DirectPipelineRunner. Renamed the folder to direct to keep all
related files in the same folder. Removed inprocess prefix from
file names/class names/comments.

DiskCachedPipelineRunner - not used, and it was a temporary solution
to reduce the memory usage of DirectPipelineRunner. New
DirectPipelineRunner does not have the same amount of memory usage.

EagerPipelineRunner moved to direct folder and changed it to use new
DirectPipelineRunner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f201cbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f201cbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f201cbb

Branch: refs/heads/python-sdk
Commit: 7f201cbb7de8f69473cb710dce768c7285adbded
Parents: 93a95d6
Author: Ahmet Altay <al...@google.com>
Authored: Fri Oct 28 14:47:48 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Mon Nov 7 17:56:49 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   |   7 +-
 sdks/python/apache_beam/io/source_test_utils.py |  14 +-
 sdks/python/apache_beam/pipeline_test.py        |  78 ---
 sdks/python/apache_beam/runners/__init__.py     |   3 +-
 .../apache_beam/runners/direct/__init__.py      |  19 +
 .../runners/direct/bundle_factory.py            | 102 ++++
 sdks/python/apache_beam/runners/direct/clock.py |  50 ++
 .../consumer_tracking_pipeline_visitor.py       |  59 ++
 .../consumer_tracking_pipeline_visitor_test.py  | 122 ++++
 .../apache_beam/runners/direct/direct_runner.py | 158 ++++++
 .../runners/direct/evaluation_context.py        | 272 +++++++++
 .../apache_beam/runners/direct/executor.py      | 550 +++++++++++++++++++
 .../runners/direct/transform_evaluator.py       | 542 ++++++++++++++++++
 .../runners/direct/transform_result.py          |  60 ++
 .../runners/direct/watermark_manager.py         | 224 ++++++++
 .../python/apache_beam/runners/direct_runner.py | 308 -----------
 .../apache_beam/runners/inprocess/__init__.py   |  19 -
 .../runners/inprocess/bundle_factory.py         | 102 ----
 .../apache_beam/runners/inprocess/clock.py      |  50 --
 .../consumer_tracking_pipeline_visitor.py       |  59 --
 .../consumer_tracking_pipeline_visitor_test.py  | 122 ----
 .../inprocess/inprocess_evaluation_context.py   | 272 ---------
 .../runners/inprocess/inprocess_executor.py     | 550 -------------------
 .../runners/inprocess/inprocess_runner.py       | 142 -----
 .../runners/inprocess/inprocess_runner_test.py  | 121 ----
 .../inprocess/inprocess_transform_result.py     |  60 --
 .../inprocess/inprocess_watermark_manager.py    | 224 --------
 .../runners/inprocess/transform_evaluator.py    | 542 ------------------
 sdks/python/apache_beam/runners/runner.py       |   5 +-
 sdks/python/tox.ini                             |   1 +
 30 files changed, 2181 insertions(+), 2656 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index 77d8840..d9b907c 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,7 +26,8 @@ For an example implementation of ``FileBasedSource`` see 
``avroio.AvroSource``.
 """
 
 import random
-
+import threading
+import weakref
 from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
@@ -132,6 +133,10 @@ class FileBasedSource(iobase.BoundedSource):
     elif len(file_names) == 1:
       return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
     else:
+      # ThreadPool crashes in old versions of Python (< 2.7.5) if created from 
a
+      # child thread. (http://bugs.python.org/issue10015)
+      if not hasattr(threading.current_thread(), '_children'):
+        threading.current_thread()._children = weakref.WeakKeyDictionary()
       pool = ThreadPool(
           min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
       try:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py 
b/sdks/python/apache_beam/io/source_test_utils.py
index 480a95d..f5e599f 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -46,6 +46,8 @@ For example usages, see the unit tests of modules such as
 
 from collections import namedtuple
 import logging
+import threading
+import weakref
 
 from multiprocessing.pool import ThreadPool
 from apache_beam.io import iobase
@@ -87,6 +89,14 @@ def readFromSource(source, start_position=None, 
stop_position=None):
   return values
 
 
+def _ThreadPool(threads):
+  # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
+  # child thread. (http://bugs.python.org/issue10015)
+  if not hasattr(threading.current_thread(), '_children'):
+    threading.current_thread()._children = weakref.WeakKeyDictionary()
+  return ThreadPool(threads)
+
+
 def assertSourcesEqualReferenceSource(reference_source_info, sources_info):
   """Tests if a reference source is equal to a given set of sources.
 
@@ -545,7 +555,7 @@ def assertSplitAtFractionExhaustive(
     have_success = False
     have_failure = False
 
-    thread_pool = ThreadPool(2)
+    thread_pool = _ThreadPool(2)
     try:
       while True:
         num_trials += 1
@@ -606,7 +616,7 @@ def _assertSplitAtFractionConcurrent(
       return result
 
   inputs = []
-  pool = thread_pool if thread_pool else ThreadPool(2)
+  pool = thread_pool if thread_pool else _ThreadPool(2)
   try:
     inputs.append([True, reader_iter])
     inputs.append([False, range_tracker, split_fraction])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index dedd732..db3ad9e 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -17,20 +17,15 @@
 
 """Unit tests for the Pipeline class."""
 
-import gc
 import logging
 import unittest
 
 from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
-from apache_beam.pvalue import AsIter
-from apache_beam.pvalue import SideOutputValue
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
-from apache_beam.transforms import CombinePerKey
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
-from apache_beam.transforms import Flatten
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import Read
@@ -185,65 +180,6 @@ class PipelineTest(unittest.TestCase):
         ['a-x', 'b-x', 'c-x'],
         sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
 
-  def test_cached_pvalues_are_refcounted(self):
-    """Test that cached PValues are refcounted and deleted.
-
-    The intermediary PValues computed by the workflow below contain
-    one million elements so if the refcounting does not work the number of
-    objects tracked by the garbage collector will increase by a few millions
-    by the time we execute the final Map checking the objects tracked.
-    Anything that is much larger than what we started with will fail the test.
-    """
-    def check_memory(value, count_threshold):
-      gc.collect()
-      objects_count = len(gc.get_objects())
-      if objects_count > count_threshold:
-        raise RuntimeError(
-            'PValues are not refcounted: %s, %s' % (
-                objects_count, count_threshold))
-      return value
-
-    def create_dupes(o, _):
-      yield o
-      yield SideOutputValue('side', o)
-
-    num_elements = 100000
-
-    pipeline = Pipeline('DirectPipelineRunner')
-
-    gc.collect()
-    count_threshold = len(gc.get_objects()) + 10000
-    biglist = pipeline | 'oom:create' >> Create(['x'] * num_elements)
-    dupes = (
-        biglist
-        | 'oom:addone' >> Map(lambda x: (x, 1))
-        | 'oom:dupes' >> FlatMap(
-            create_dupes, AsIter(biglist)).with_outputs('side', main='main'))
-    result = (
-        (dupes.side, dupes.main, dupes.side)
-        | 'oom:flatten' >> Flatten()
-        | 'oom:combine' >> CombinePerKey(sum)
-        | 'oom:check' >> Map(check_memory, count_threshold))
-
-    assert_that(result, equal_to([('x', 3 * num_elements)]))
-    pipeline.run()
-    self.assertEqual(
-        pipeline.runner.debug_counters['element_counts'],
-        {
-            'oom:flatten': 3 * num_elements,
-            ('oom:combine/GroupByKey/reify_windows', None): 3 * num_elements,
-            ('oom:dupes/FlatMap(create_dupes)', 'side'): num_elements,
-            ('oom:dupes/FlatMap(create_dupes)', None): num_elements,
-            'oom:create': num_elements,
-            ('oom:addone', None): num_elements,
-            'oom:combine/GroupByKey/group_by_key': 1,
-            ('oom:check', None): 1,
-            'assert_that/singleton': 1,
-            ('assert_that/WindowInto', None): 1,
-            ('assert_that/Map(match)', None): 1,
-            ('oom:combine/GroupByKey/group_by_window', None): 1,
-            ('oom:combine/Combine/ParDo(CombineValuesDoFn)', None): 1})
-
   def test_pipeline_as_context(self):
     def raise_exception(exn):
       raise exn
@@ -257,20 +193,6 @@ class PipelineTest(unittest.TestCase):
     self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x))
 
 
-class DiskCachedRunnerPipelineTest(PipelineTest):
-
-  def setUp(self):
-    self.runner_name = 'DiskCachedPipelineRunner'
-
-  def test_cached_pvalues_are_refcounted(self):
-    # Takes long with disk spilling.
-    pass
-
-  def test_eager_pipeline(self):
-    # Tests eager runner only
-    pass
-
-
 class Bacon(PipelineOptions):
 
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py 
b/sdks/python/apache_beam/runners/__init__.py
index 860391e..40ced50 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -21,7 +21,8 @@ This package defines runners, which are used to execute a 
pipeline.
 """
 
 from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
-from apache_beam.runners.direct_runner import DirectPipelineRunner
+from apache_beam.runners.direct.direct_runner import DirectPipelineRunner
+from apache_beam.runners.direct.direct_runner import EagerPipelineRunner
 from apache_beam.runners.runner import create_runner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/__init__.py 
b/sdks/python/apache_beam/runners/direct/__init__.py
new file mode 100644
index 0000000..067f35a
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Inprocess runner executes pipelines locally in a single process."""
+from apache_beam.runners.direct.direct_runner import DirectPipelineRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py 
b/sdks/python/apache_beam/runners/direct/bundle_factory.py
new file mode 100644
index 0000000..d284449
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A factory that creates UncommittedBundles."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+
+
+class BundleFactory(object):
+  """BundleFactory creates output bundles to be used by transform 
evaluators."""
+
+  def create_bundle(self, output_pcollection):
+    return Bundle(output_pcollection)
+
+  def create_empty_committed_bundle(self, output_pcollection):
+    bundle = self.create_bundle(output_pcollection)
+    bundle.commit(None)
+    return bundle
+
+
+# a bundle represents a unit of work that will be processed by a transform.
+class Bundle(object):
+  """Part of a PCollection with output elements.
+
+  Part of a PCollection. Elements are output to a bundle, which will cause them
+  to be executed by PTransform that consume the PCollection this bundle is a
+  part of at a later point. It starts as an uncommitted bundle and can have
+  elements added to it. It needs to be committed to make it immutable before
+  passing it to a downstream ptransform.
+  """
+
+  def __init__(self, pcollection):
+    assert (isinstance(pcollection, pvalue.PCollection)
+            or isinstance(pcollection, pvalue.PCollectionView))
+    self._pcollection = pcollection
+    self._elements = []
+    self._committed = False
+    self._tag = None  # optional tag information for this bundle
+
+  @property
+  def elements(self):
+    """Returns iterable elements. If not committed will return a copy."""
+    if self._committed:
+      return self._elements
+    else:
+      return list(self._elements)
+
+  @property
+  def tag(self):
+    return self._tag
+
+  @tag.setter
+  def tag(self, value):
+    assert not self._tag
+    self._tag = value
+
+  @property
+  def pcollection(self):
+    """PCollection that the elements of this UncommittedBundle belong to."""
+    return self._pcollection
+
+  def add(self, element):
+    """Outputs an element to this bundle.
+
+    Args:
+      element: WindowedValue
+    """
+    assert not self._committed
+    self._elements.append(element)
+
+  def output(self, element):
+    self.add(element)
+
+  def commit(self, synchronized_processing_time):
+    """Commits this bundle.
+
+    Uncommitted bundle will become committed (immutable) after this call.
+
+    Args:
+      synchronized_processing_time: the synchronized processing time at which
+      this bundle was committed
+    """
+    assert not self._committed
+    self._committed = True
+    self._elements = tuple(self._elements)
+    self._synchronized_processing_time = synchronized_processing_time

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/clock.py 
b/sdks/python/apache_beam/runners/direct/clock.py
new file mode 100644
index 0000000..11e49cd
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/clock.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Clock implementations for real time processing and testing."""
+
+from __future__ import absolute_import
+
+import time
+
+
+class Clock(object):
+
+  @property
+  def now(self):
+    """Returns the number of milliseconds since epoch."""
+    return int(time.time() * 1000)
+
+
+class MockClock(Clock):
+  """Mock clock implementation for testing."""
+
+  def __init__(self, now_in_ms):
+    self._now_in_ms = now_in_ms
+
+  @property
+  def now(self):
+    return self._now_in_ms
+
+  @now.setter
+  def now(self, value_in_ms):
+    assert value_in_ms >= self._now_in_ms
+    self._now_in_ms = value_in_ms
+
+  def advance(self, duration_in_ms):
+    assert duration_in_ms >= 0
+    self._now_in_ms += duration_in_ms

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
new file mode 100644
index 0000000..6f1757a
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ConsumerTrackingPipelineVisitor, a PipelineVisitor object."""
+
+from __future__ import absolute_import
+
+from apache_beam import pvalue
+from apache_beam.pipeline import PipelineVisitor
+
+
+class ConsumerTrackingPipelineVisitor(PipelineVisitor):
+  """Visitor for extracting value-consumer relations from the graph.
+
+  Tracks the AppliedPTransforms that consume each PValue in the Pipeline. This
+  is used to schedule consuming PTransforms to consume input after the upstream
+  transform has produced and committed output.
+  """
+
+  def __init__(self):
+    self.value_to_consumers = {}  # Map from PValue to [AppliedPTransform].
+    self.root_transforms = set()  # set of (root) AppliedPTransforms.
+    self.views = []               # list of PCollectionViews.
+    self.step_names = {}          # Map from AppliedPTransform to String.
+
+    self._num_transforms = 0
+
+  def visit_value(self, value, producer_node):
+    if value:
+      if isinstance(value, pvalue.PCollectionView):
+        self.views.append(value)
+
+  def visit_transform(self, applied_ptransform):
+    inputs = applied_ptransform.inputs
+    if inputs:
+      for input_value in inputs:
+        if isinstance(input_value, pvalue.PBegin):
+          self.root_transforms.add(applied_ptransform)
+        if input_value not in self.value_to_consumers:
+          self.value_to_consumers[input_value] = []
+        self.value_to_consumers[input_value].append(applied_ptransform)
+    else:
+      self.root_transforms.add(applied_ptransform)
+    self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
+    self._num_transforms += 1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
new file mode 100644
index 0000000..b87f7b1
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for consumer_tracking_pipeline_visitor."""
+
+import logging
+import unittest
+
+from apache_beam import pvalue
+from apache_beam.io import Read
+from apache_beam.io import TextFileSource
+from apache_beam.pipeline import Pipeline
+from apache_beam.pvalue import AsList
+from apache_beam.runners.direct import DirectPipelineRunner
+from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import 
ConsumerTrackingPipelineVisitor
+from apache_beam.transforms import CoGroupByKey
+from apache_beam.transforms import Create
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import FlatMap
+from apache_beam.transforms import Flatten
+from apache_beam.transforms import ParDo
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+# pylint: disable=pointless-statement
+
+
+class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
+
+  def setUp(self):
+    self.pipeline = Pipeline(DirectPipelineRunner())
+    self.visitor = ConsumerTrackingPipelineVisitor()
+
+  def test_root_transforms(self):
+    root_create = Create('create', [[1, 2, 3]])
+    root_read = Read('read', TextFileSource('/tmp/somefile'))
+    root_flatten = Flatten('flatten', pipeline=self.pipeline)
+
+    pbegin = pvalue.PBegin(self.pipeline)
+    pcoll_create = pbegin | root_create
+    pbegin | root_read
+    pcoll_create | FlatMap(lambda x: x)
+    [] | root_flatten
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(root_transforms, sorted(
+        [root_read, root_create, root_flatten]))
+
+    pbegin_consumers = sorted(
+        [c.transform for c in self.visitor.value_to_consumers[pbegin]])
+    self.assertEqual(pbegin_consumers, sorted([root_read, root_create]))
+    self.assertEqual(len(self.visitor.step_names), 4)
+
+  def test_side_inputs(self):
+
+    class SplitNumbersFn(DoFn):
+
+      def process(self, context):
+        if context.element < 0:
+          yield pvalue.SideOutputValue('tag_negative', context.element)
+        else:
+          yield context.element
+
+    class ProcessNumbersFn(DoFn):
+
+      def process(self, context, negatives):
+        yield context.element
+
+    root_create = Create('create', [[-1, 2, 3]])
+
+    result = (self.pipeline
+              | root_create
+              | ParDo(SplitNumbersFn()).with_outputs('tag_negative',
+                                                     main='positive'))
+    positive, negative = result
+    positive | ParDo(ProcessNumbersFn(), AsList(negative))
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(root_transforms, sorted([root_create]))
+    self.assertEqual(len(self.visitor.step_names), 4)
+    self.assertEqual(len(self.visitor.views), 1)
+    self.assertTrue(isinstance(self.visitor.views[0],
+                               pvalue.ListPCollectionView))
+
+  def test_co_group_by_key(self):
+    emails = self.pipeline | 'email' >> Create([('joe', 'j...@example.com')])
+    phones = self.pipeline | 'phone' >> Create([('mary', '111-222-3333')])
+    {'emails': emails, 'phones': phones} | CoGroupByKey()
+
+    self.pipeline.visit(self.visitor)
+
+    root_transforms = sorted(
+        [t.transform for t in self.visitor.root_transforms])
+    self.assertEqual(len(root_transforms), 2)
+    self.assertGreater(
+        len(self.visitor.step_names), 3)  # 2 creates + expanded CoGBK
+    self.assertEqual(len(self.visitor.views), 0)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
new file mode 100644
index 0000000..2e5fe74
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""DirectPipelineRunner, executing on the local machine.
+
+The DirectPipelineRunner is a runner implementation that executes the entire
+graph of transformations belonging to a pipeline on the local machine.
+"""
+
+from __future__ import absolute_import
+
+import collections
+import logging
+
+from apache_beam.runners.direct.bundle_factory import BundleFactory
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineRunner
+from apache_beam.runners.runner import PipelineState
+from apache_beam.runners.runner import PValueCache
+
+
+class DirectPipelineRunner(PipelineRunner):
+  """Executes a single pipeline on the local machine."""
+
+  def __init__(self):
+    self._cache = None
+
+  def run(self, pipeline):
+    """Execute the entire pipeline and returns an DirectPipelineResult."""
+
+    # TODO: Move imports to top. Pipeline <-> Runner dependecy cause problems
+    # with resolving imports when they are at top.
+    # pylint: disable=wrong-import-position
+    from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import \
+      ConsumerTrackingPipelineVisitor
+    from apache_beam.runners.direct.evaluation_context import EvaluationContext
+    from apache_beam.runners.direct.executor import Executor
+    from apache_beam.runners.direct.transform_evaluator import \
+      TransformEvaluatorRegistry
+
+    logging.info('Running pipeline with DirectPipelineRunner.')
+    self.visitor = ConsumerTrackingPipelineVisitor()
+    pipeline.visit(self.visitor)
+
+    evaluation_context = EvaluationContext(
+        pipeline.options,
+        BundleFactory(),
+        self.visitor.root_transforms,
+        self.visitor.value_to_consumers,
+        self.visitor.step_names,
+        self.visitor.views)
+
+    evaluation_context.use_pvalue_cache(self._cache)
+
+    executor = Executor(self.visitor.value_to_consumers,
+                        TransformEvaluatorRegistry(evaluation_context),
+                        evaluation_context)
+    # Start the executor. This is a non-blocking call, it will start the
+    # execution in background threads and return.
+    executor.start(self.visitor.root_transforms)
+    result = DirectPipelineResult(executor, evaluation_context)
+
+    # TODO(altay): If blocking:
+    # Block until the pipeline completes. This call will return after the
+    # pipeline was fully terminated (successfully or with a failure).
+    result.await_completion()
+
+    if self._cache:
+      self._cache.finalize()
+
+    return result
+
+  @property
+  def cache(self):
+    if not self._cache:
+      self._cache = BufferingInMemoryCache()
+    return self._cache.pvalue_cache
+
+  def apply(self, transform, input):  # pylint: disable=redefined-builtin
+    """Runner callback for a pipeline.apply call."""
+    return transform.apply(input)
+
+
+class BufferingInMemoryCache(object):
+  """PValueCache wrapper for buffering bundles until a PValue is fully 
computed.
+
+  BufferingInMemoryCache keeps an in memory cache of
+  (applied_ptransform, tag) tuples. It accepts appending to existing cache
+  entries until it is finalized. finalize() will make all the existing cached
+  entries visible to the underyling PValueCache in their entirety, clean the in
+  memory cache and stop accepting new cache entries.
+  """
+
+  def __init__(self):
+    self._cache = collections.defaultdict(list)
+    self._pvalue_cache = PValueCache()
+    self._finalized = False
+
+  @property
+  def pvalue_cache(self):
+    return self._pvalue_cache
+
+  def append(self, applied_ptransform, tag, elements):
+    assert not self._finalized
+    assert elements is not None
+    self._cache[(applied_ptransform, tag)].extend(elements)
+
+  def finalize(self):
+    """Make buffered cache elements visible to the underlying PValueCache."""
+    assert not self._finalized
+    for key, value in self._cache.iteritems():
+      applied_ptransform, tag = key
+      self._pvalue_cache.cache_output(applied_ptransform, tag, value)
+      self._cache = None
+
+
+class DirectPipelineResult(PipelineResult):
+  """A DirectPipelineResult provides access to info about a pipeline."""
+
+  def __init__(self, executor, evaluation_context):
+    super(DirectPipelineResult, self).__init__(PipelineState.RUNNING)
+    self._executor = executor
+    self._evaluation_context = evaluation_context
+
+  def _is_in_terminal_state(self):
+    return self._state is not PipelineState.RUNNING
+
+  def await_completion(self):
+    if not self._is_in_terminal_state():
+      try:
+        self._executor.await_completion()
+        self._state = PipelineState.DONE
+      except:  # pylint: disable=broad-except
+        self._state = PipelineState.FAILED
+        raise
+    return self._state
+
+  def aggregated_values(self, aggregator_or_name):
+    return self._evaluation_context.get_aggregator_values(aggregator_or_name)
+
+
+class EagerPipelineRunner(DirectPipelineRunner):
+
+  is_eager = True

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
new file mode 100644
index 0000000..660574b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -0,0 +1,272 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""EvaluationContext tracks global state, triggers and watermarks."""
+
+from __future__ import absolute_import
+
+import collections
+import threading
+
+from apache_beam.transforms import sideinputs
+from apache_beam.runners.direct.clock import Clock
+from apache_beam.runners.direct.watermark_manager import WatermarkManager
+from apache_beam.runners.direct.executor import TransformExecutor
+from apache_beam.utils import counters
+
+
+class _ExecutionContext(object):
+
+  def __init__(self, watermarks, existing_state):
+    self._watermarks = watermarks
+    self._existing_state = existing_state
+
+  @property
+  def watermarks(self):
+    return self._watermarks
+
+  @property
+  def existing_state(self):
+    return self._existing_state
+
+
+class _SideInputView(object):
+
+  def __init__(self, view):
+    self._view = view
+    self.callable_queue = collections.deque()
+    self.value = None
+    self.has_result = False
+
+
+class _SideInputsContainer(object):
+  """An in-process container for PCollectionViews.
+
+  It provides methods for blocking until a side-input is available and writing
+  to a side input.
+  """
+
+  def __init__(self, views):
+    self._lock = threading.Lock()
+    self._views = {}
+    for view in views:
+      self._views[view] = _SideInputView(view)
+
+  def get_value_or_schedule_after_output(self, pcollection_view, task):
+    with self._lock:
+      view = self._views[pcollection_view]
+      if not view.has_result:
+        view.callable_queue.append(task)
+        task.blocked = True
+      return (view.has_result, view.value)
+
+  def set_value_and_get_callables(self, pcollection_view, values):
+    with self._lock:
+      view = self._views[pcollection_view]
+      assert not view.has_result
+      assert view.value is None
+      assert view.callable_queue is not None
+      view.value = self._pvalue_to_value(pcollection_view, values)
+      result = tuple(view.callable_queue)
+      for task in result:
+        task.blocked = False
+      view.callable_queue = None
+      view.has_result = True
+      return result
+
+  def _pvalue_to_value(self, view, values):
+    """Given a PCollectionView, returns the associated value in requested form.
+
+    Args:
+      view: PCollectionView for the requested side input.
+      values: Iterable values associated with the side input.
+
+    Returns:
+      The side input in its requested form.
+
+    Raises:
+      ValueError: If values cannot be converted into the requested form.
+    """
+    return sideinputs.SideInputMap(type(view), view._view_options(), values)
+
+
+class EvaluationContext(object):
+  """Evaluation context with the global state information of the pipeline.
+
+  The evaluation context for a specific pipeline being executed by the
+  DirectPipelineRunner. Contains state shared within the execution across all
+  transforms.
+
+  EvaluationContext contains shared state for an execution of the
+  DirectPipelineRunner that can be used while evaluating a PTransform. This
+  consists of views into underlying state and watermark implementations, access
+  to read and write PCollectionViews, and constructing counter sets and
+  execution contexts. This includes executing callbacks asynchronously when
+  state changes to the appropriate point (e.g. when a PCollectionView is
+  requested and known to be empty).
+
+  EvaluationContext also handles results by committing finalizing
+  bundles based on the current global state and updating the global state
+  appropriately. This includes updating the per-(step,key) state, updating
+  global watermarks, and executing any callbacks that can be executed.
+  """
+
+  def __init__(self, pipeline_options, bundle_factory, root_transforms,
+               value_to_consumers, step_names, views):
+    self.pipeline_options = pipeline_options
+    self._bundle_factory = bundle_factory
+    self._root_transforms = root_transforms
+    self._value_to_consumers = value_to_consumers
+    self._step_names = step_names
+    self.views = views
+
+    # AppliedPTransform -> Evaluator specific state objects
+    self._application_state_interals = {}
+    self._watermark_manager = WatermarkManager(
+        Clock(), root_transforms, value_to_consumers)
+    self._side_inputs_container = _SideInputsContainer(views)
+    self._pending_unblocked_tasks = []
+    self._counter_factory = counters.CounterFactory()
+    self._cache = None
+
+    self._lock = threading.Lock()
+
+  def use_pvalue_cache(self, cache):
+    assert not self._cache
+    self._cache = cache
+
+  @property
+  def has_cache(self):
+    return self._cache is not None
+
+  def append_to_cache(self, applied_ptransform, tag, elements):
+    with self._lock:
+      assert self._cache
+      self._cache.append(applied_ptransform, tag, elements)
+
+  def is_root_transform(self, applied_ptransform):
+    return applied_ptransform in self._root_transforms
+
+  def handle_result(
+      self, completed_bundle, completed_timers, result):
+    """Handle the provided result produced after evaluating the input bundle.
+
+    Handle the provided TransformResult, produced after evaluating
+    the provided committed bundle (potentially None, if the result of a root
+    PTransform).
+
+    The result is the output of running the transform contained in the
+    TransformResult on the contents of the provided bundle.
+
+    Args:
+      completed_bundle: the bundle that was processed to produce the result.
+      completed_timers: the timers that were delivered to produce the
+                        completed_bundle.
+      result: the TransformResult of evaluating the input bundle
+
+    Returns:
+      the committed bundles contained within the handled result.
+    """
+    with self._lock:
+      committed_bundles = self._commit_bundles(result.output_bundles)
+      self._watermark_manager.update_watermarks(
+          completed_bundle, result.transform, completed_timers,
+          committed_bundles, result.watermark_hold)
+
+      # If the result is for a view, update side inputs container.
+      if (result.output_bundles
+          and result.output_bundles[0].pcollection in self.views):
+        if committed_bundles:
+          assert len(committed_bundles) == 1
+          side_input_result = committed_bundles[0].elements
+        else:
+          side_input_result = []
+        tasks = self._side_inputs_container.set_value_and_get_callables(
+            result.output_bundles[0].pcollection, side_input_result)
+        self._pending_unblocked_tasks.extend(tasks)
+
+      if result.counters:
+        for counter in result.counters:
+          merged_counter = self._counter_factory.get_counter(
+              counter.name, counter.combine_fn)
+          merged_counter.accumulator.merge([counter.accumulator])
+
+      self._application_state_interals[result.transform] = result.state
+      return committed_bundles
+
+  def get_aggregator_values(self, aggregator_or_name):
+    return self._counter_factory.get_aggregator_values(aggregator_or_name)
+
+  def schedule_pending_unblocked_tasks(self, executor_service):
+    if self._pending_unblocked_tasks:
+      with self._lock:
+        for task in self._pending_unblocked_tasks:
+          executor_service.submit(task)
+        self._pending_unblocked_tasks = []
+
+  def _commit_bundles(self, uncommitted_bundles):
+    """Commits bundles and returns a immutable set of committed bundles."""
+    for in_progress_bundle in uncommitted_bundles:
+      producing_applied_ptransform = in_progress_bundle.pcollection.producer
+      watermarks = self._watermark_manager.get_watermarks(
+          producing_applied_ptransform)
+      in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
+    return tuple(uncommitted_bundles)
+
+  def get_execution_context(self, applied_ptransform):
+    return _ExecutionContext(
+        self._watermark_manager.get_watermarks(applied_ptransform),
+        self._application_state_interals.get(applied_ptransform))
+
+  def create_bundle(self, output_pcollection):
+    """Create an uncommitted bundle for the specified PCollection."""
+    return self._bundle_factory.create_bundle(output_pcollection)
+
+  def create_empty_committed_bundle(self, output_pcollection):
+    """Create empty bundle useful for triggering evaluation."""
+    return self._bundle_factory.create_empty_committed_bundle(
+        output_pcollection)
+
+  def extract_fired_timers(self):
+    return self._watermark_manager.extract_fired_timers()
+
+  def is_done(self, transform=None):
+    """Checks completion of a step or the pipeline.
+
+    Args:
+      transform: AppliedPTransform to check for completion.
+
+    Returns:
+      True if the step will not produce additional output. If transform is None
+      returns true if all steps are done.
+    """
+    if transform:
+      return self._is_transform_done(transform)
+    else:
+      for applied_ptransform in self._step_names:
+        if not self._is_transform_done(applied_ptransform):
+          return False
+      return True
+
+  def _is_transform_done(self, transform):
+    tw = self._watermark_manager.get_watermarks(transform)
+    return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
+
+  def get_value_or_schedule_after_output(self, pcollection_view, task):
+    assert isinstance(task, TransformExecutor)
+    return self._side_inputs_container.get_value_or_schedule_after_output(
+        pcollection_view, task)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
new file mode 100644
index 0000000..0f1c53b
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -0,0 +1,550 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An executor that schedules and executes applied ptransforms."""
+
+from __future__ import absolute_import
+
+import collections
+import logging
+import Queue
+import threading
+import traceback
+from weakref import WeakValueDictionary
+
+
+class ExecutorService(object):
+  """Thread pool for executing tasks in parallel."""
+
+  class CallableTask(object):
+
+    def __call__(self):
+      pass
+
+    @property
+    def name(self):
+      return None
+
+  class ExecutorServiceWorker(threading.Thread):
+    """Worker thread for executing a single task at a time."""
+
+    # Amount to block waiting for getting an item from the queue in seconds.
+    TIMEOUT = 5
+
+    def __init__(self, queue, index):
+      super(ExecutorService.ExecutorServiceWorker, self).__init__()
+      self.queue = queue
+      self._index = index
+      self._default_name = 'ExecutorServiceWorker-' + str(index)
+      self._update_name()
+      self.shutdown_requested = False
+      self.start()
+
+    def _update_name(self, task=None):
+      if task and task.name:
+        name = task.name
+      else:
+        name = self._default_name
+      self.name = 'Thread: %d, %s (%s)' % (
+          self._index, name, 'executing' if task else 'idle')
+
+    def _get_task_or_none(self):
+      try:
+        # Do not block indefinitely, otherwise we may not act for a requested
+        # shutdown.
+        return self.queue.get(
+            timeout=ExecutorService.ExecutorServiceWorker.TIMEOUT)
+      except Queue.Empty:
+        return None
+
+    def run(self):
+      while not self.shutdown_requested:
+        task = self._get_task_or_none()
+        if task:
+          try:
+            if not self.shutdown_requested:
+              self._update_name(task)
+              task()
+              self._update_name()
+          finally:
+            self.queue.task_done()
+
+    def shutdown(self):
+      self.shutdown_requested = True
+
+  def __init__(self, num_workers):
+    self.queue = Queue.Queue()
+    self.workers = [ExecutorService.ExecutorServiceWorker(
+        self.queue, i) for i in range(num_workers)]
+    self.shutdown_requested = False
+
+  def submit(self, task):
+    assert isinstance(task, ExecutorService.CallableTask)
+    if not self.shutdown_requested:
+      self.queue.put(task)
+
+  def await_completion(self):
+    for worker in self.workers:
+      worker.join()
+
+  def shutdown(self):
+    self.shutdown_requested = True
+
+    for worker in self.workers:
+      worker.shutdown()
+
+    # Consume all the remaining items in the queue
+    while not self.queue.empty():
+      try:
+        self.queue.get_nowait()
+        self.queue.task_done()
+      except Queue.Empty:
+        continue
+    # All existing threads will eventually terminate (after they complete their
+    # last task).
+
+
+class TransformEvaluationState(object):
+
+  def __init__(self, executor_service, scheduled):
+    self.executor_service = executor_service
+    self.scheduled = scheduled
+
+  def schedule(self, work):
+    self.scheduled.add(work)
+    self.executor_service.submit(work)
+
+  def complete(self, completed_work):
+    self.scheduled.remove(completed_work)
+
+
+class ParallelEvaluationState(TransformEvaluationState):
+  """A TransformEvaluationState with unlimited parallelism.
+
+  Any TransformExecutor scheduled will be immediately submitted to the
+  ExecutorService.
+
+  A principal use of this is for evaluators that can generate output bundles
+  only using the input bundle (e.g. ParDo).
+  """
+  pass
+
+
+class SerialEvaluationState(TransformEvaluationState):
+  """A TransformEvaluationState with a single work queue.
+
+  Any TransformExecutor scheduled will be placed on the work queue. Only one
+  item of work will be submitted to the ExecutorService at any time.
+
+  A principal use of this is for evaluators that keeps a global state such as
+  GroupByKeyOnly.
+  """
+
+  def __init__(self, executor_service, scheduled):
+    super(SerialEvaluationState, self).__init__(executor_service, scheduled)
+    self.serial_queue = collections.deque()
+    self.currently_evaluating = None
+    self._lock = threading.Lock()
+
+  def complete(self, completed_work):
+    self._update_currently_evaluating(None, completed_work)
+    super(SerialEvaluationState, self).complete(completed_work)
+
+  def schedule(self, new_work):
+    self._update_currently_evaluating(new_work, None)
+
+  def _update_currently_evaluating(self, new_work, completed_work):
+    with self._lock:
+      if new_work:
+        self.serial_queue.append(new_work)
+      if completed_work:
+        assert self.currently_evaluating == completed_work
+        self.currently_evaluating = None
+      if self.serial_queue and not self.currently_evaluating:
+        next_work = self.serial_queue.pop()
+        self.currently_evaluating = next_work
+        super(SerialEvaluationState, self).schedule(next_work)
+
+
+class TransformExecutorServices(object):
+  """Schedules and completes TransformExecutors.
+
+  Controls the concurrency as appropriate for the applied transform the 
executor
+  exists for.
+  """
+
+  def __init__(self, executor_service):
+    self._executor_service = executor_service
+    self._scheduled = set()
+    self._parallel = ParallelEvaluationState(
+        self._executor_service, self._scheduled)
+    self._serial_cache = WeakValueDictionary()
+
+  def parallel(self):
+    return self._parallel
+
+  def serial(self, step):
+    cached = self._serial_cache.get(step)
+    if not cached:
+      cached = SerialEvaluationState(self._executor_service, self._scheduled)
+      self._serial_cache[step] = cached
+    return  cached
+
+  @property
+  def executors(self):
+    return frozenset(self._scheduled)
+
+
+class _CompletionCallback(object):
+  """The default completion callback.
+
+  The default completion callback is used to complete transform evaluations
+  that are triggered due to the arrival of elements from an upstream transform,
+  or for a source transform.
+  """
+
+  def __init__(self, evaluation_context, all_updates, timers=None):
+    self._evaluation_context = evaluation_context
+    self._all_updates = all_updates
+    self._timers = timers
+
+  def handle_result(self, input_committed_bundle, transform_result):
+    output_committed_bundles = self._evaluation_context.handle_result(
+        input_committed_bundle, self._timers, transform_result)
+    for output_committed_bundle in output_committed_bundles:
+      self._all_updates.offer(_ExecutorServiceParallelExecutor.ExecutorUpdate(
+          output_committed_bundle, None))
+    return output_committed_bundles
+
+  def handle_exception(self, exception):
+    self._all_updates.offer(
+        _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
+
+
+class _TimerCompletionCallback(_CompletionCallback):
+
+  def __init__(self, evaluation_context, all_updates, timers):
+    super(_TimerCompletionCallback, self).__init__(
+        evaluation_context, all_updates, timers)
+
+
+class TransformExecutor(ExecutorService.CallableTask):
+  """TransformExecutor will evaluate a bundle using an applied ptransform.
+
+  A CallableTask responsible for constructing a TransformEvaluator 
andevaluating
+  it on some bundle of input, and registering the result using the completion
+  callback.
+  """
+
+  def __init__(self, transform_evaluator_registry, evaluation_context,
+               input_bundle, applied_transform, completion_callback,
+               transform_evaluation_state):
+    self._transform_evaluator_registry = transform_evaluator_registry
+    self._evaluation_context = evaluation_context
+    self._input_bundle = input_bundle
+    self._applied_transform = applied_transform
+    self._completion_callback = completion_callback
+    self._transform_evaluation_state = transform_evaluation_state
+    self._side_input_values = {}
+    self.blocked = False
+    self._call_count = 0
+
+  def __call__(self):
+    self._call_count += 1
+    assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
+
+    for side_input in self._applied_transform.side_inputs:
+      if side_input not in self._side_input_values:
+        has_result, value = (
+            self._evaluation_context.get_value_or_schedule_after_output(
+                side_input, self))
+        if not has_result:
+          # Monitor task will reschedule this executor once the side input is
+          # available.
+          return
+        self._side_input_values[side_input] = value
+
+    side_input_values = [self._side_input_values[side_input]
+                         for side_input in self._applied_transform.side_inputs]
+
+    try:
+      evaluator = self._transform_evaluator_registry.for_application(
+          self._applied_transform, self._input_bundle, side_input_values)
+
+      if self._input_bundle:
+        for value in self._input_bundle.elements:
+          evaluator.process_element(value)
+
+      result = evaluator.finish_bundle()
+
+      if self._evaluation_context.has_cache:
+        for uncommitted_bundle in result.output_bundles:
+          self._evaluation_context.append_to_cache(
+              self._applied_transform, uncommitted_bundle.tag,
+              uncommitted_bundle.elements)
+        undeclared_tag_values = result.undeclared_tag_values
+        if undeclared_tag_values:
+          for tag, value in undeclared_tag_values.iteritems():
+            self._evaluation_context.append_to_cache(
+                self._applied_transform, tag, value)
+
+      self._completion_callback.handle_result(self._input_bundle, result)
+      return result
+    except Exception as e:  # pylint: disable=broad-except
+      logging.warning('Task failed: %s', traceback.format_exc(), exc_info=True)
+      self._completion_callback.handle_exception(e)
+    finally:
+      self._transform_evaluation_state.complete(self)
+
+
+class Executor(object):
+
+  def __init__(self, *args, **kwargs):
+    self._executor = _ExecutorServiceParallelExecutor(*args, **kwargs)
+
+  def start(self, roots):
+    self._executor.start(roots)
+
+  def await_completion(self):
+    self._executor.await_completion()
+
+
+class _ExecutorServiceParallelExecutor(object):
+  """An internal implementation for Executor."""
+
+  NUM_WORKERS = 1
+
+  def __init__(self, value_to_consumers, transform_evaluator_registry,
+               evaluation_context):
+    self.executor_service = ExecutorService(
+        _ExecutorServiceParallelExecutor.NUM_WORKERS)
+    self.transform_executor_services = TransformExecutorServices(
+        self.executor_service)
+    self.value_to_consumers = value_to_consumers
+    self.transform_evaluator_registry = transform_evaluator_registry
+    self.evaluation_context = evaluation_context
+    self.all_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
+        _ExecutorServiceParallelExecutor.ExecutorUpdate)
+    self.visible_updates = _ExecutorServiceParallelExecutor._TypedUpdateQueue(
+        _ExecutorServiceParallelExecutor.VisibleExecutorUpdate)
+    self.default_completion_callback = _CompletionCallback(
+        evaluation_context, self.all_updates)
+
+  def start(self, roots):
+    self.root_nodes = frozenset(roots)
+    self.executor_service.submit(
+        _ExecutorServiceParallelExecutor._MonitorTask(self))
+
+  def await_completion(self):
+    update = self.visible_updates.take()
+    try:
+      if update.exception:
+        raise update.exception
+    finally:
+      self.executor_service.shutdown()
+
+  def schedule_consumers(self, committed_bundle):
+    if committed_bundle.pcollection in self.value_to_consumers:
+      consumers = self.value_to_consumers[committed_bundle.pcollection]
+      for applied_ptransform in consumers:
+        self.schedule_consumption(applied_ptransform, committed_bundle,
+                                  self.default_completion_callback)
+
+  def schedule_consumption(self, consumer_applied_transform, committed_bundle,
+                           on_complete):
+    """Schedules evaluation of the given bundle with the transform."""
+    assert all([consumer_applied_transform, on_complete])
+    assert committed_bundle or consumer_applied_transform in self.root_nodes
+    if (committed_bundle
+        and self.transform_evaluator_registry.should_execute_serially(
+            consumer_applied_transform)):
+      transform_executor_service = self.transform_executor_services.serial(
+          consumer_applied_transform)
+    else:
+      transform_executor_service = self.transform_executor_services.parallel()
+
+    transform_executor = TransformExecutor(
+        self.transform_evaluator_registry, self.evaluation_context,
+        committed_bundle, consumer_applied_transform, on_complete,
+        transform_executor_service)
+    transform_executor_service.schedule(transform_executor)
+
+  class _TypedUpdateQueue(object):
+    """Type checking update queue with blocking and non-blocking operations."""
+
+    def __init__(self, item_type):
+      self._item_type = item_type
+      self._queue = Queue.Queue()
+
+    def poll(self):
+      try:
+        item = self._queue.get_nowait()
+        self._queue.task_done()
+        return  item
+      except Queue.Empty:
+        return None
+
+    def take(self):
+      item = self._queue.get()
+      self._queue.task_done()
+      return item
+
+    def offer(self, item):
+      assert isinstance(item, self._item_type)
+      self._queue.put_nowait(item)
+
+  class ExecutorUpdate(object):
+    """An internal status update on the state of the executor."""
+
+    def __init__(self, produced_bundle=None, exception=None):
+      # Exactly one of them should be not-None
+      assert bool(produced_bundle) != bool(exception)
+      self.committed_bundle = produced_bundle
+      self.exception = exception
+
+  class VisibleExecutorUpdate(object):
+    """An update of interest to the user.
+
+    Used for awaiting the completion to decide whether to return normally or
+    raise an exception.
+    """
+
+    def __init__(self, exception=None):
+      self.finished = exception is not None
+      self.exception = exception
+
+  class _MonitorTask(ExecutorService.CallableTask):
+    """MonitorTask continuously runs to ensure that pipeline makes progress."""
+
+    def __init__(self, executor):
+      self._executor = executor
+
+    @property
+    def name(self):
+      return 'monitor'
+
+    def __call__(self):
+      try:
+        update = self._executor.all_updates.poll()
+        while update:
+          if update.committed_bundle:
+            self._executor.schedule_consumers(update.committed_bundle)
+          else:
+            assert update.exception
+            logging.warning('A task failed with exception.\n %s',
+                            update.exception)
+            self._executor.visible_updates.offer(
+                _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                    update.exception))
+          update = self._executor.all_updates.poll()
+        self._executor.evaluation_context.schedule_pending_unblocked_tasks(
+            self._executor.executor_service)
+        self._add_work_if_necessary(self._fire_timers())
+      except Exception as e:  # pylint: disable=broad-except
+        logging.error('Monitor task died due to exception.\n %s', e)
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(e))
+      finally:
+        if not self._should_shutdown():
+          self._executor.executor_service.submit(self)
+
+    def _should_shutdown(self):
+      """_should_shutdown checks whether pipeline is completed or not.
+
+      It will check for successful completion by checking the watermarks of all
+      transforms. If they all reached the maximum watermark it means that
+      pipeline successfully reached to completion.
+
+      If the above is not true, it will check that at least one executor is
+      making progress. Otherwise pipeline will be declared stalled.
+
+      If the pipeline reached to a terminal state as explained above
+      _should_shutdown will request executor to gracefully shutdown.
+
+      Returns:
+        True if pipeline reached a terminal state and monitor task could 
finish.
+        Otherwise monitor task should schedule itself again for future
+        execution.
+      """
+      if self._executor.evaluation_context.is_done():
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate())
+        self._executor.executor_service.shutdown()
+        return True
+      elif not self._is_executing:
+        self._executor.visible_updates.offer(
+            _ExecutorServiceParallelExecutor.VisibleExecutorUpdate(
+                Exception('Monitor task detected a pipeline stall.')))
+        self._executor.executor_service.shutdown()
+        return True
+      return False
+
+    def _fire_timers(self):
+      """Schedules triggered consumers if any timers fired.
+
+      Returns:
+        True if timers fired.
+      """
+      fired_timers = self._executor.evaluation_context.extract_fired_timers()
+      for applied_ptransform in fired_timers:
+        # Use an empty committed bundle. just to trigger.
+        empty_bundle = (
+            self._executor.evaluation_context.create_empty_committed_bundle(
+                applied_ptransform.inputs[0]))
+        timer_completion_callback = _TimerCompletionCallback(
+            self._executor.evaluation_context, self._executor.all_updates,
+            applied_ptransform)
+
+        self._executor.schedule_consumption(
+            applied_ptransform, empty_bundle, timer_completion_callback)
+      return bool(fired_timers)
+
+    def _is_executing(self):
+      """Returns True if there is at least one non-blocked 
TransformExecutor."""
+      for transform_executor in (
+          self._executor.transform_executor_services.executors):
+        if not transform_executor.blocked:
+          return True
+      return False
+
+    def _add_work_if_necessary(self, timers_fired):
+      """Adds more work from the roots if pipeline requires more input.
+
+      If all active TransformExecutors are in a blocked state, add more work
+      from root nodes that may have additional work. This ensures that if a
+      pipeline has elements available from the root nodes it will add those
+      elements when necessary.
+
+      Args:
+        timers_fired: True if any timers fired prior to this call.
+      """
+      # If any timers have fired, they will add more work; No need to add more.
+      if timers_fired:
+        return
+
+      if self._is_executing():
+        # We have at least one executor that can proceed without adding
+        # additional work.
+        return
+
+      # All current TransformExecutors are blocked; add more work from the
+      # roots.
+      for applied_transform in self._executor.root_nodes:
+        if not self._executor.evaluation_context.is_done(applied_transform):
+          self._executor.schedule_consumption(
+              applied_transform, None,
+              self._executor.default_completion_callback)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
new file mode 100644
index 0000000..c732d7f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -0,0 +1,542 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An evaluator of a specific application of a transform."""
+
+from __future__ import absolute_import
+
+import collections
+import copy
+
+from apache_beam import coders
+from apache_beam import pvalue
+import apache_beam.io as io
+from apache_beam.runners.common import DoFnRunner
+from apache_beam.runners.common import DoFnState
+from apache_beam.runners.direct.watermark_manager import WatermarkManager
+from apache_beam.runners.direct.transform_result import TransformResult
+from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # 
pylint: disable=protected-access
+from apache_beam.transforms import core
+from apache_beam.transforms import sideinputs
+from apache_beam.transforms.window import GlobalWindows
+from apache_beam.transforms.window import WindowedValue
+from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
+from apache_beam.typehints.typecheck import TypeCheckError
+from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
+from apache_beam.utils import counters
+from apache_beam.utils.options import TypeOptions
+
+
+class TransformEvaluatorRegistry(object):
+  """Creates instances of TransformEvaluator for the application of a 
transform.
+  """
+
+  def __init__(self, evaluation_context):
+    assert evaluation_context
+    self._evaluation_context = evaluation_context
+    self._evaluators = {
+        io.Read: _BoundedReadEvaluator,
+        core.Create: _CreateEvaluator,
+        core.Flatten: _FlattenEvaluator,
+        core.ParDo: _ParDoEvaluator,
+        core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
+        sideinputs.CreatePCollectionView: _CreatePCollectionViewEvaluator,
+        _NativeWrite: _NativeWriteEvaluator,
+    }
+
+  def for_application(
+      self, applied_ptransform, input_committed_bundle, side_inputs):
+    """Returns a TransformEvaluator suitable for processing given inputs."""
+    assert applied_ptransform
+    assert bool(applied_ptransform.side_inputs) == bool(side_inputs)
+
+    # Walk up the class hierarchy to find an evaluable type. This is necessary
+    # for supporting sub-classes of core transforms.
+    for cls in applied_ptransform.transform.__class__.mro():
+      evaluator = self._evaluators.get(cls)
+      if evaluator:
+        break
+
+    if not evaluator:
+      raise NotImplementedError(
+          'Execution of [%s] not implemented in runner %s.' % (
+              type(applied_ptransform.transform), self))
+    return evaluator(self._evaluation_context, applied_ptransform,
+                     input_committed_bundle, side_inputs)
+
+  def should_execute_serially(self, applied_ptransform):
+    """Returns True if this applied_ptransform should run one bundle at a time.
+
+    Some TransformEvaluators use a global state object to keep track of their
+    global execution state. For example evaluator for GroupByKeyOnly uses this
+    state as an in memory dictionary to buffer keys.
+
+    Serially executed evaluators will act as syncing point in the graph and
+    execution will not move forward until they receive all of their inputs. 
Once
+    they receive all of their input, they will release the combined output.
+    Their output may consist of multiple bundles as they may divide their 
output
+    into pieces before releasing.
+
+    Args:
+      applied_ptransform: Transform to be used for execution.
+
+    Returns:
+      True if executor should execute applied_ptransform serially.
+    """
+    return isinstance(applied_ptransform.transform,
+                      (core.GroupByKeyOnly, sideinputs.CreatePCollectionView,
+                       _NativeWrite))
+
+
+class _TransformEvaluator(object):
+  """An evaluator of a specific application of a transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    self._evaluation_context = evaluation_context
+    self._applied_ptransform = applied_ptransform
+    self._input_committed_bundle = input_committed_bundle
+    self._side_inputs = side_inputs
+    self._expand_outputs()
+    self._execution_context = evaluation_context.get_execution_context(
+        applied_ptransform)
+    self.start_bundle()
+
+  def _expand_outputs(self):
+    outputs = set()
+    for pval in self._applied_ptransform.outputs.values():
+      if isinstance(pval, pvalue.DoOutputsTuple):
+        pvals = (v for v in pval)
+      else:
+        pvals = (pval,)
+      for v in pvals:
+        outputs.add(v)
+    self._outputs = frozenset(outputs)
+
+  def _split_list_into_bundles(
+      self, output_pcollection, elements, max_element_per_bundle,
+      element_size_fn):
+    """Splits elements, an iterable, into multiple output bundles.
+
+    Args:
+      output_pcollection: PCollection that the elements belong to.
+      elements: elements to be chunked into bundles.
+      max_element_per_bundle: (approximately) the maximum element per bundle.
+        If it is None, only a single bundle will be produced.
+      element_size_fn: Function to return the size of a given element.
+
+    Returns:
+      List of output uncommitted bundles with at least one bundle.
+    """
+    bundle = self._evaluation_context.create_bundle(output_pcollection)
+    bundle_size = 0
+    bundles = [bundle]
+    for element in elements:
+      if max_element_per_bundle and bundle_size >= max_element_per_bundle:
+        bundle = self._evaluation_context.create_bundle(output_pcollection)
+        bundle_size = 0
+        bundles.append(bundle)
+
+      bundle.output(element)
+      bundle_size += element_size_fn(element)
+    return bundles
+
+  def start_bundle(self):
+    """Starts a new bundle."""
+    pass
+
+  def process_element(self, element):
+    """Processes a new element as part of the current bundle."""
+    raise NotImplementedError('%s do not process elements.', type(self))
+
+  def finish_bundle(self):
+    """Finishes the bundle and produces output."""
+    pass
+
+
+class _BoundedReadEvaluator(_TransformEvaluator):
+  """TransformEvaluator for bounded Read transform."""
+
+  MAX_ELEMENT_PER_BUNDLE = 100
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not input_committed_bundle
+    assert not side_inputs
+    self._source = applied_ptransform.transform.source
+    self._source.pipeline_options = evaluation_context.pipeline_options
+    super(_BoundedReadEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  def finish_bundle(self):
+    assert len(self._outputs) == 1
+    output_pcollection = list(self._outputs)[0]
+
+    def _read_values_to_bundles(reader):
+      read_result = [GlobalWindows.windowed_value(e) for e in reader]
+      return self._split_list_into_bundles(
+          output_pcollection, read_result,
+          _BoundedReadEvaluator.MAX_ELEMENT_PER_BUNDLE, lambda _: 1)
+
+    if isinstance(self._source, io.iobase.BoundedSource):
+      # Getting a RangeTracker for the default range of the source and reading
+      # the full source using that.
+      range_tracker = self._source.get_range_tracker(None, None)
+      reader = self._source.read(range_tracker)
+      bundles = _read_values_to_bundles(reader)
+    else:
+      with self._source.reader() as reader:
+        bundles = _read_values_to_bundles(reader)
+
+    return TransformResult(
+        self._applied_ptransform, bundles, None, None, None, None)
+
+
+class _FlattenEvaluator(_TransformEvaluator):
+  """TransformEvaluator for Flatten transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not side_inputs
+    super(_FlattenEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  def start_bundle(self):
+    assert len(self._outputs) == 1
+    output_pcollection = list(self._outputs)[0]
+    self.bundle = self._evaluation_context.create_bundle(output_pcollection)
+
+  def process_element(self, element):
+    self.bundle.output(element)
+
+  def finish_bundle(self):
+    bundles = [self.bundle]
+    return TransformResult(
+        self._applied_ptransform, bundles, None, None, None, None)
+
+
+class _CreateEvaluator(_TransformEvaluator):
+  """TransformEvaluator for Create transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not input_committed_bundle
+    assert not side_inputs
+    super(_CreateEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  def start_bundle(self):
+    assert len(self._outputs) == 1
+    output_pcollection = list(self._outputs)[0]
+    self.bundle = self._evaluation_context.create_bundle(output_pcollection)
+
+  def finish_bundle(self):
+    bundles = []
+    transform = self._applied_ptransform.transform
+
+    assert transform.value is not None
+    create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
+    for result in create_result:
+      self.bundle.output(result)
+    bundles.append(self.bundle)
+
+    return TransformResult(
+        self._applied_ptransform, bundles, None, None, None, None)
+
+
+class _TaggedReceivers(dict):
+  """Received ParDo output and redirect to the associated output bundle."""
+
+  def __init__(self, evaluation_context):
+    self._evaluation_context = evaluation_context
+    self._null_receiver = None
+    self._undeclared_in_memory_tag_values = None
+    super(_TaggedReceivers, self).__init__()
+
+  @property
+  def undeclared_in_memory_tag_values(self):
+    assert (not self._undeclared_in_memory_tag_values
+            or self._evaluation_context.has_cache)
+    return self._undeclared_in_memory_tag_values
+
+  class NullReceiver(object):
+    """Ignores undeclared side outputs, default execution mode."""
+
+    def output(self, element):
+      pass
+
+  class InMemoryReceiver(object):
+    """Buffers undeclared side outputs to the given dictionary."""
+
+    def __init__(self, target, tag):
+      self._target = target
+      self._tag = tag
+
+    def output(self, element):
+      self._target[self._tag].append(element)
+
+  def __missing__(self, key):
+    if self._evaluation_context.has_cache:
+      if not self._undeclared_in_memory_tag_values:
+        self._undeclared_in_memory_tag_values = collections.defaultdict(list)
+      receiver = _TaggedReceivers.InMemoryReceiver(
+          self._undeclared_in_memory_tag_values, key)
+    else:
+      if not self._null_receiver:
+        self._null_receiver = _TaggedReceivers.NullReceiver()
+      receiver = self._null_receiver
+    return receiver
+
+
+class _ParDoEvaluator(_TransformEvaluator):
+  """TransformEvaluator for ParDo transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    super(_ParDoEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  def start_bundle(self):
+    transform = self._applied_ptransform.transform
+
+    self._tagged_receivers = _TaggedReceivers(self._evaluation_context)
+    if isinstance(self._applied_ptransform.parent.transform, 
core._MultiParDo):  # pylint: disable=protected-access
+      do_outputs_tuple = self._applied_ptransform.parent.outputs[0]
+      assert isinstance(do_outputs_tuple, pvalue.DoOutputsTuple)
+      main_output_pcollection = do_outputs_tuple[do_outputs_tuple._main_tag]  
# pylint: disable=protected-access
+
+      for side_output_tag in transform.side_output_tags:
+        output_pcollection = do_outputs_tuple[side_output_tag]
+        self._tagged_receivers[side_output_tag] = (
+            self._evaluation_context.create_bundle(output_pcollection))
+        self._tagged_receivers[side_output_tag].tag = side_output_tag
+    else:
+      assert len(self._outputs) == 1
+      main_output_pcollection = list(self._outputs)[0]
+
+    self._tagged_receivers[None] = self._evaluation_context.create_bundle(
+        main_output_pcollection)
+    self._tagged_receivers[None].tag = None  # main_tag is None.
+
+    self._counter_factory = counters.CounterFactory()
+
+    dofn = copy.deepcopy(transform.dofn)
+
+    pipeline_options = self._evaluation_context.pipeline_options
+    if (pipeline_options is not None
+        and pipeline_options.view_as(TypeOptions).runtime_type_check):
+      dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints())
+
+    dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label)
+    self.runner = DoFnRunner(dofn, transform.args, transform.kwargs,
+                             self._side_inputs,
+                             self._applied_ptransform.inputs[0].windowing,
+                             tagged_receivers=self._tagged_receivers,
+                             step_name=self._applied_ptransform.full_label,
+                             state=DoFnState(self._counter_factory))
+    self.runner.start()
+
+  def process_element(self, element):
+    self.runner.process(element)
+
+  def finish_bundle(self):
+    self.runner.finish()
+    bundles = self._tagged_receivers.values()
+    result_counters = self._counter_factory.get_counters()
+    return TransformResult(
+        self._applied_ptransform, bundles, None, None, result_counters, None,
+        self._tagged_receivers.undeclared_in_memory_tag_values)
+
+
+class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
+  """TransformEvaluator for GroupByKeyOnly transform."""
+
+  MAX_ELEMENT_PER_BUNDLE = None
+
+  class _GroupByKeyOnlyEvaluatorState(object):
+
+    def __init__(self):
+      # output: {} key -> [values]
+      self.output = collections.defaultdict(list)
+      self.completed = False
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not side_inputs
+    super(_GroupByKeyOnlyEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  @property
+  def _is_final_bundle(self):
+    return (self._execution_context.watermarks.input_watermark
+            == WatermarkManager.WATERMARK_POS_INF)
+
+  def start_bundle(self):
+    self.state = (self._execution_context.existing_state
+                  if self._execution_context.existing_state
+                  else 
_GroupByKeyOnlyEvaluator._GroupByKeyOnlyEvaluatorState())
+
+    assert len(self._outputs) == 1
+    self.output_pcollection = list(self._outputs)[0]
+
+    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
+    kv_type_hint = (
+        self._applied_ptransform.transform.get_type_hints().input_types[0])
+    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+
+  def process_element(self, element):
+    assert not self.state.completed
+    if (isinstance(element, WindowedValue)
+        and isinstance(element.value, collections.Iterable)
+        and len(element.value) == 2):
+      k, v = element.value
+      self.state.output[self.key_coder.encode(k)].append(v)
+    else:
+      raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
+                           'windowed key-value pairs. Instead received: %r.'
+                           % element)
+
+  def finish_bundle(self):
+    if self._is_final_bundle:
+      if self.state.completed:
+        # Ignore empty bundles after emitting output. (This may happen because
+        # empty bundles do not affect input watermarks.)
+        bundles = []
+      else:
+        gbk_result = (
+            map(GlobalWindows.windowed_value, (
+                (self.key_coder.decode(k), v)
+                for k, v in self.state.output.iteritems())))
+
+        def len_element_fn(element):
+          _, v = element.value
+          return len(v)
+
+        bundles = self._split_list_into_bundles(
+            self.output_pcollection, gbk_result,
+            _GroupByKeyOnlyEvaluator.MAX_ELEMENT_PER_BUNDLE, len_element_fn)
+
+      self.state.completed = True
+      state = self.state
+      hold = WatermarkManager.WATERMARK_POS_INF
+    else:
+      bundles = []
+      state = self.state
+      hold = WatermarkManager.WATERMARK_NEG_INF
+
+    return TransformResult(
+        self._applied_ptransform, bundles, state, None, None, hold)
+
+
+class _CreatePCollectionViewEvaluator(_TransformEvaluator):
+  """TransformEvaluator for CreatePCollectionView transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not side_inputs
+    super(_CreatePCollectionViewEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+  @property
+  def _is_final_bundle(self):
+    return (self._execution_context.watermarks.input_watermark
+            == WatermarkManager.WATERMARK_POS_INF)
+
+  def start_bundle(self):
+    # state: [values]
+    self.state = (self._execution_context.existing_state
+                  if self._execution_context.existing_state else [])
+
+    assert len(self._outputs) == 1
+    self.output_pcollection = list(self._outputs)[0]
+
+  def process_element(self, element):
+    self.state.append(element)
+
+  def finish_bundle(self):
+    if self._is_final_bundle:
+      bundle = self._evaluation_context.create_bundle(self.output_pcollection)
+
+      view_result = self.state
+      for result in view_result:
+        bundle.output(result)
+
+      bundles = [bundle]
+      state = None
+      hold = WatermarkManager.WATERMARK_POS_INF
+    else:
+      bundles = []
+      state = self.state
+      hold = WatermarkManager.WATERMARK_NEG_INF
+
+    return TransformResult(
+        self._applied_ptransform, bundles, state, None, None, hold)
+
+
+class _NativeWriteEvaluator(_TransformEvaluator):
+  """TransformEvaluator for _NativeWrite transform."""
+
+  def __init__(self, evaluation_context, applied_ptransform,
+               input_committed_bundle, side_inputs):
+    assert not side_inputs
+    super(_NativeWriteEvaluator, self).__init__(
+        evaluation_context, applied_ptransform, input_committed_bundle,
+        side_inputs)
+
+    assert applied_ptransform.transform.sink
+    self._sink = copy.deepcopy(applied_ptransform.transform.sink)
+
+  @property
+  def _is_final_bundle(self):
+    return (self._execution_context.watermarks.input_watermark
+            == WatermarkManager.WATERMARK_POS_INF)
+
+  def start_bundle(self):
+    # state: [values]
+    self.state = (self._execution_context.existing_state
+                  if self._execution_context.existing_state else [])
+
+  def process_element(self, element):
+    self.state.append(element)
+
+  def finish_bundle(self):
+    # TODO(altay): Do not wait until the last bundle to write in a single 
shard.
+    if self._is_final_bundle:
+      if isinstance(self._sink, io.fileio.NativeTextFileSink):
+        assert self._sink.num_shards in (0, 1)
+        if self._sink.shard_name_template:
+          self._sink.file_path += '-00000-of-00001'
+          self._sink.file_path += self._sink.file_name_suffix
+      self._sink.pipeline_options = self._evaluation_context.pipeline_options
+      with self._sink.writer() as writer:
+        for v in self.state:
+          writer.Write(v.value)
+
+      state = None
+      hold = WatermarkManager.WATERMARK_POS_INF
+    else:
+      state = self.state
+      hold = WatermarkManager.WATERMARK_NEG_INF
+
+    return TransformResult(
+        self._applied_ptransform, [], state, None, None, hold)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f201cbb/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py 
b/sdks/python/apache_beam/runners/direct/transform_result.py
new file mode 100644
index 0000000..298e629
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+from __future__ import absolute_import
+
+
+class TransformResult(object):
+  """The result of evaluating an AppliedPTransform with a 
TransformEvaluator."""
+
+  def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
+               timer_update, counters, watermark_hold,
+               undeclared_tag_values=None):
+    self._applied_ptransform = applied_ptransform
+    self._uncommitted_output_bundles = uncommitted_output_bundles
+    self._state = state
+    self._timer_update = timer_update
+    self._counters = counters
+    self._watermark_hold = watermark_hold
+    # Only used when caching (materializing) all values is requested.
+    self._undeclared_tag_values = undeclared_tag_values
+
+  @property
+  def transform(self):
+    return self._applied_ptransform
+
+  @property
+  def output_bundles(self):
+    return self._uncommitted_output_bundles
+
+  @property
+  def state(self):
+    return self._state
+
+  @property
+  def counters(self):
+    return self._counters
+
+  @property
+  def watermark_hold(self):
+    return self._watermark_hold
+
+  @property
+  def undeclared_tag_values(self):
+    return self._undeclared_tag_values

Reply via email to