Change side inputs to be references rather than full PValues.

This is more consistent with the Runner API's structure.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/207de81b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/207de81b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/207de81b

Branch: refs/heads/master
Commit: 207de81bca4c3761cf663d32f9b95a022ef97165
Parents: 132d3c5
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Thu Mar 30 08:20:21 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Mar 31 12:11:00 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  12 +-
 sdks/python/apache_beam/pvalue.py               | 258 +++++++------------
 sdks/python/apache_beam/pvalue_test.py          |  33 ---
 .../runners/dataflow/dataflow_runner.py         |  29 ++-
 .../runners/direct/bundle_factory.py            |   3 +-
 .../consumer_tracking_pipeline_visitor.py       |  11 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 .../runners/direct/evaluation_context.py        |  60 +++--
 .../apache_beam/runners/direct/executor.py      |   7 +-
 .../runners/direct/transform_evaluator.py       |  51 +---
 sdks/python/apache_beam/transforms/core.py      |   2 +-
 .../python/apache_beam/transforms/ptransform.py |   4 +-
 .../python/apache_beam/transforms/sideinputs.py | 132 ----------
 .../apache_beam/transforms/sideinputs_test.py   |  91 +++----
 14 files changed, 198 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index be2a79d..ee5904b 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -142,9 +142,6 @@ class Pipeline(object):
     # If a transform is applied and the full label is already in the set
     # then the transform will have to be cloned with a new label.
     self.applied_labels = set()
-    # Store cache of views created from PCollections.  For reference, see
-    # pvalue._cache_view().
-    self._view_cache = {}
 
   def _current_transform(self):
     """Returns the transform currently on the top of the stack."""
@@ -271,8 +268,8 @@ class Pipeline(object):
         result.producer = current
       # TODO(robertwb): Multi-input, multi-output inference.
       # TODO(robertwb): Ideally we'd do intersection here.
-      if (type_options is not None and type_options.pipeline_type_check and
-          isinstance(result, (pvalue.PCollection, pvalue.PCollectionView))
+      if (type_options is not None and type_options.pipeline_type_check
+          and isinstance(result, pvalue.PCollection)
           and not result.element_type):
         input_element_type = (
             inputs[0].element_type
@@ -416,7 +413,7 @@ class AppliedPTransform(object):
         if not isinstance(main_input, pvalue.PBegin):
           real_producer(main_input).refcounts[main_input.tag] += 1
       for side_input in self.side_inputs:
-        real_producer(side_input).refcounts[side_input.tag] += 1
+        real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1
 
   def add_output(self, output, tag=None):
     if isinstance(output, pvalue.DoOutputsTuple):
@@ -456,7 +453,8 @@ class AppliedPTransform(object):
 
     # Visit side inputs.
     for pval in self.side_inputs:
-      if isinstance(pval, pvalue.PCollectionView) and pval not in visited:
+      if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
+        pval = pval.pvalue  # Unpack marker-object-wrapped pvalue.
         assert pval.producer is not None
         pval.producer.visit(visitor, pipeline, visited)
         # The value should be visited now since we visit outputs too.

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 4114b3f..bfe1745 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -26,9 +26,10 @@ produced when the pipeline gets executed.
 
 from __future__ import absolute_import
 
-import collections
 import itertools
 
+from apache_beam import typehints
+
 
 class PValue(object):
   """Base class for PCollection.
@@ -250,20 +251,22 @@ class SideOutputValue(object):
     self.value = value
 
 
-class PCollectionView(PValue):
-  """An immutable view of a PCollection that can be used as a side input."""
+class AsSideInput(object):
+  """Marker specifying that a PCollection will be used as a side input.
 
-  def __init__(self, pipeline, window_mapping_fn):
-    """Initializes a PCollectionView. Do not call directly."""
-    super(PCollectionView, self).__init__(pipeline)
-    self._window_mapping_fn = window_mapping_fn
+  When a PCollection is supplied as a side input to a PTransform, it is
+  necessary to indicate how the PCollection should be made available
+  as a PTransform side argument (e.g. in the form of an iterable, mapping,
+  or single value).  This class is the superclass of all the various
+  options, and should not be instantiated directly. (See instead AsSingleton,
+  AsIter, etc.)
+  """
 
-  @property
-  def windowing(self):
-    if not hasattr(self, '_windowing'):
-      self._windowing = self.producer.transform.get_windowing(
-          self.producer.inputs)
-    return self._windowing
+  def __init__(self, pcoll):
+    from apache_beam.transforms import sideinputs
+    self.pvalue = pcoll
+    self._window_mapping_fn = sideinputs.default_window_mapping_fn(
+        pcoll.windowing.windowfn)
 
   def _view_options(self):
     """Internal options corresponding to specific view.
@@ -275,19 +278,39 @@ class PCollectionView(PValue):
     """
     return {'window_mapping_fn': self._window_mapping_fn}
 
+  @property
+  def element_type(self):
+    return typehints.Any
+
+
+class AsSingleton(AsSideInput):
+  """Marker specifying that an entire PCollection is to be used as a side 
input.
+
+  When a PCollection is supplied as a side input to a PTransform, it is
+  necessary to indicate whether the entire PCollection should be made available
+  as a PTransform side argument (in the form of an iterable), or whether just
+  one value should be pulled from the PCollection and supplied as the side
+  argument (as an ordinary value).
 
-class SingletonPCollectionView(PCollectionView):
-  """A PCollectionView that contains a single object."""
+  Wrapping a PCollection side input argument to a PTransform in this container
+  (e.g., data.apply('label', MyPTransform(), AsSingleton(my_side_input) )
+  selects the latter behavor.
 
-  def __init__(self, pipeline, has_default, default_value,
-               window_mapping_fn):
-    super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn)
-    self.has_default = has_default
+  The input PCollection must contain exactly one  value per window, unless a
+  default is given, in which case it may be empty.
+  """
+  _NO_DEFAULT = object()
+
+  def __init__(self, pcoll, default_value=_NO_DEFAULT):
+    super(AsSingleton, self).__init__(pcoll)
     self.default_value = default_value
 
+  def __repr__(self):
+    return 'AsSingleton(%s)' % self.pvalue
+
   def _view_options(self):
-    base = super(SingletonPCollectionView, self)._view_options()
-    if self.has_default:
+    base = super(AsSingleton, self)._view_options()
+    if self.default_value != AsSingleton._NO_DEFAULT:
       return dict(base, default=self.default_value)
     else:
       return base
@@ -304,182 +327,83 @@ class SingletonPCollectionView(PCollectionView):
           'PCollection with more than one element accessed as '
           'a singleton view.')
 
-
-class IterablePCollectionView(PCollectionView):
-  """A PCollectionView that can be treated as an iterable."""
-
-  @staticmethod
-  def _from_runtime_iterable(it, options):
-    return it
+  @property
+  def element_type(self):
+    return self.pvalue.element_type
 
 
-class ListPCollectionView(PCollectionView):
-  """A PCollectionView that can be treated as a list."""
+class AsIter(AsSideInput):
+  """Marker specifying that an entire PCollection is to be used as a side 
input.
 
-  @staticmethod
-  def _from_runtime_iterable(it, options):
-    return list(it)
+  When a PCollection is supplied as a side input to a PTransform, it is
+  necessary to indicate whether the entire PCollection should be made available
+  as a PTransform side argument (in the form of an iterable), or whether just
+  one value should be pulled from the PCollection and supplied as the side
+  argument (as an ordinary value).
 
+  Wrapping a PCollection side input argument to a PTransform in this container
+  (e.g., data.apply('label', MyPTransform(), AsIter(my_side_input) ) selects 
the
+  former behavor.
+  """
 
-class DictPCollectionView(PCollectionView):
-  """A PCollectionView that can be treated as a dict."""
+  def __repr__(self):
+    return 'AsIter(%s)' % self.pvalue
 
   @staticmethod
   def _from_runtime_iterable(it, options):
-    return dict(it)
-
-
-def _get_cached_view(pipeline, key):
-  return pipeline._view_cache.get(key, None)  # pylint: 
disable=protected-access
-
-
-def _cache_view(pipeline, key, view):
-  pipeline._view_cache[key] = view  # pylint: disable=protected-access
-
-
-def _format_view_label(pcoll):
-  # The monitoring UI doesn't like '/' character in transform labels.
-  if not pcoll.producer:
-    return str(pcoll.tag)
-  return '%s.%s' % (pcoll.producer.full_label.replace('/', '|'),
-                    pcoll.tag)
-
-
-_SINGLETON_NO_DEFAULT = object()
-
+    return it
 
-def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None):  # 
pylint: disable=invalid-name
-  """Create a SingletonPCollectionView from the contents of input PCollection.
+  @property
+  def element_type(self):
+    return typehints.Iterable[self.pvalue.element_type]
 
-  The input PCollection should contain at most one element (per window) and the
-  resulting PCollectionView can then be used as a side input to PTransforms. If
-  the PCollectionView is empty (for a given window), the side input value will
-  be the default_value, if specified; otherwise, it will be an EmptySideInput
-  object.
 
-  Args:
-    pcoll: Input pcollection.
-    default_value: Default value for the singleton view.
-    label: Label to be specified if several AsSingleton's with different
-      defaults for the same PCollection.
+class AsList(AsSideInput):
+  """Marker specifying that an entire PCollection is to be used as a side 
input.
 
-  Returns:
-    A singleton PCollectionView containing the element as above.
-  """
-  label = label or _format_view_label(pcoll)
-  has_default = default_value is not _SINGLETON_NO_DEFAULT
-  if not has_default:
-    default_value = None
-
-  # Don't recreate the view if it was already created.
-  hashable_default_value = ('val', default_value)
-  if not isinstance(default_value, collections.Hashable):
-    # Massage default value to treat as hash key.
-    hashable_default_value = ('id', id(default_value))
-  cache_key = (pcoll, AsSingleton, has_default, hashable_default_value)
-  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
-  if cached_view:
-    return cached_view
-
-  # Local import is required due to dependency loop; even though the
-  # implementation of this function requires concepts defined in modules that
-  # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
-  view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value,
-                                             label=label))
-  _cache_view(pcoll.pipeline, cache_key, view)
-  return view
-
-
-def AsIter(pcoll, label=None):  # pylint: disable=invalid-name
-  """Create an IterablePCollectionView from the elements of input PCollection.
-
-  The contents of the given PCollection will be available as an iterable in
-  PTransforms that use the returned PCollectionView as a side input.
+  Intended for use in side-argument specification---the same places where
+  AsSingleton and AsIter are used, but forces materialization of this
+  PCollection as a list.
 
   Args:
     pcoll: Input pcollection.
-    label: Label to be specified if several AsIter's for the same PCollection.
 
   Returns:
-    An iterable PCollectionView containing the elements as above.
+    An AsList-wrapper around a PCollection whose one element is a list
+    containing all elements in pcoll.
   """
-  label = label or _format_view_label(pcoll)
-
-  # Don't recreate the view if it was already created.
-  cache_key = (pcoll, AsIter)
-  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
-  if cached_view:
-    return cached_view
 
-  # Local import is required due to dependency loop; even though the
-  # implementation of this function requires concepts defined in modules that
-  # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
-  view = (pcoll | sideinputs.ViewAsIterable(label=label))
-  _cache_view(pcoll.pipeline, cache_key, view)
-  return view
+  @staticmethod
+  def _from_runtime_iterable(it, options):
+    return list(it)
 
 
-def AsList(pcoll, label=None):  # pylint: disable=invalid-name
-  """Create a ListPCollectionView from the elements of input PCollection.
+class AsDict(AsSideInput):
+  """Marker specifying a PCollection to be used as an indexable side input.
 
-  The contents of the given PCollection will be available as a list-like object
-  in PTransforms that use the returned PCollectionView as a side input.
+  Intended for use in side-argument specification---the same places where
+  AsSingleton and AsIter are used, but returns an interface that allows
+  key lookup.
 
   Args:
-    pcoll: Input pcollection.
-    label: Label to be specified if several AsList's for the same PCollection.
+    pcoll: Input pcollection. All elements should be key-value pairs (i.e.
+       2-tuples) with unique keys.
 
   Returns:
-    A list PCollectionView containing the elements as above.
+    An AsDict-wrapper around a PCollection whose one element is a dict with
+      entries for uniquely-keyed pairs in pcoll.
   """
-  label = label or _format_view_label(pcoll)
-
-  # Don't recreate the view if it was already created.
-  cache_key = (pcoll, AsList)
-  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
-  if cached_view:
-    return cached_view
-
-  # Local import is required due to dependency loop; even though the
-  # implementation of this function requires concepts defined in modules that
-  # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
-  view = (pcoll | sideinputs.ViewAsList(label=label))
-  _cache_view(pcoll.pipeline, cache_key, view)
-  return view
-
 
-def AsDict(pcoll, label=None):  # pylint: disable=invalid-name
-  """Create a DictPCollectionView from the elements of input PCollection.
+  @staticmethod
+  def _from_runtime_iterable(it, options):
+    return dict(it)
 
-  The contents of the given PCollection whose elements are 2-tuples of key and
-  value will be available as a dict-like object in PTransforms that use the
-  returned PCollectionView as a side input.
 
-  Args:
-    pcoll: Input pcollection containing 2-tuples of key and value.
-    label: Label to be specified if several AsDict's for the same PCollection.
-
-  Returns:
-    A dict PCollectionView containing the dict as above.
-  """
-  label = label or _format_view_label(pcoll)
-
-  # Don't recreate the view if it was already created.
-  cache_key = (pcoll, AsDict)
-  cached_view = _get_cached_view(pcoll.pipeline, cache_key)
-  if cached_view:
-    return cached_view
-
-  # Local import is required due to dependency loop; even though the
-  # implementation of this function requires concepts defined in modules that
-  # depend on pvalue, it lives in this module to reduce user workload.
-  from apache_beam.transforms import sideinputs  # pylint: 
disable=wrong-import-order, wrong-import-position
-  view = (pcoll | sideinputs.ViewAsDict(label=label))
-  _cache_view(pcoll.pipeline, cache_key, view)
-  return view
+# For backwards compatibility with worker code.
+SingletonPCollectionView = AsSingleton
+IterablePCollectionView = AsIter
+ListPCollectionView = AsList
+DictPCollectionView = AsDict
 
 
 class EmptySideInput(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py 
b/sdks/python/apache_beam/pvalue_test.py
index 86f1987..529ddf7 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -19,22 +19,8 @@
 
 import unittest
 
-from apache_beam.pipeline import Pipeline
-from apache_beam.pvalue import AsDict
-from apache_beam.pvalue import AsIter
-from apache_beam.pvalue import AsList
-from apache_beam.pvalue import AsSingleton
 from apache_beam.pvalue import PValue
 from apache_beam.test_pipeline import TestPipeline
-from apache_beam.transforms import Create
-
-
-class FakePipeline(Pipeline):
-  """Fake pipeline object used to check if apply() receives correct args."""
-
-  def apply(self, *args, **kwargs):
-    self.args = args
-    self.kwargs = kwargs
 
 
 class PValueTest(unittest.TestCase):
@@ -44,25 +30,6 @@ class PValueTest(unittest.TestCase):
     value = PValue(pipeline)
     self.assertEqual(pipeline, value.pipeline)
 
-  def test_pcollectionview_not_recreated(self):
-    pipeline = TestPipeline()
-    value = pipeline | 'create1' >> Create([1, 2, 3])
-    value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
-    value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
-    self.assertEqual(AsSingleton(value), AsSingleton(value))
-    self.assertEqual(AsSingleton(value, default_value=1, label='new'),
-                     AsSingleton(value, default_value=1, label='new'))
-    self.assertNotEqual(AsSingleton(value),
-                        AsSingleton(value, default_value=1, label='new'))
-    self.assertEqual(AsIter(value), AsIter(value))
-    self.assertEqual(AsList(value), AsList(value))
-    self.assertEqual(AsDict(value2), AsDict(value2))
-
-    self.assertNotEqual(AsSingleton(value), AsSingleton(value2))
-    self.assertNotEqual(AsIter(value), AsIter(value2))
-    self.assertNotEqual(AsList(value), AsList(value2))
-    self.assertNotEqual(AsDict(value2), AsDict(value3))
-
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index a82671c..db433df 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -31,7 +31,7 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.internal.gcp import json_value
-from apache_beam.pvalue import PCollectionView
+from apache_beam.pvalue import AsSideInput
 from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as 
dataflow_api
@@ -284,23 +284,26 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
-  def run_CreatePCollectionView(self, transform_node):
-    step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
-                          transform_node.full_label, transform_node)
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+  def _add_singleton_step(self, label, full_label, tag, input_step):
+    """Creates a CollectionToSingleton step used to handle ParDo side 
inputs."""
+    # Import here to avoid adding the dependency for local running scenarios.
+    from google.cloud.dataflow.internal import apiclient
+    step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
+    self.job.proto.steps.append(step.proto)
+    step.add_property(PropertyNames.USER_NAME, full_label)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,
         {'@type': 'OutputReference',
          PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+         PropertyNames.OUTPUT_NAME: input_step.get_output(tag)})
     step.encoding = self._get_side_input_encoding(input_step.encoding)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
         [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+            '%s.%s' % (full_label, PropertyNames.OUTPUT)),
           PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}])
+    return step
 
   def run_Flatten(self, transform_node):
     step = self._add_step(TransformNames.FLATTEN,
@@ -375,8 +378,12 @@ class DataflowRunner(PipelineRunner):
       si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
     lookup_label = lambda side_pval: si_labels[side_pval]
     for side_pval in transform_node.side_inputs:
-      assert isinstance(side_pval, PCollectionView)
-      si_label = lookup_label(side_pval)
+      assert isinstance(side_pval, AsSideInput)
+      si_label = self._get_unique_step_name()
+      si_full_label = '%s/%s' % (transform_node.full_label, si_label)
+      self._add_singleton_step(
+          si_label, si_full_label, side_pval.pvalue.tag,
+          self._cache.get_pvalue(side_pval.pvalue))
       si_dict[si_label] = {
           '@type': 'OutputReference',
           PropertyNames.STEP_NAME: si_label,

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py 
b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 63319af..647b5f2 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -106,8 +106,7 @@ class Bundle(object):
                             self._initial_windowed_value.windows)
 
   def __init__(self, pcollection, stacked=True):
-    assert (isinstance(pcollection, pvalue.PCollection)
-            or isinstance(pcollection, pvalue.PCollectionView))
+    assert isinstance(pcollection, pvalue.PCollection)
     self._pcollection = pcollection
     self._elements = []
     self._stacked = stacked

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
index 6f1757a..cdfadb7 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -34,18 +34,13 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
   def __init__(self):
     self.value_to_consumers = {}  # Map from PValue to [AppliedPTransform].
     self.root_transforms = set()  # set of (root) AppliedPTransforms.
-    self.views = []               # list of PCollectionViews.
+    self.views = []               # list of side inputs.
     self.step_names = {}          # Map from AppliedPTransform to String.
 
     self._num_transforms = 0
 
-  def visit_value(self, value, producer_node):
-    if value:
-      if isinstance(value, pvalue.PCollectionView):
-        self.views.append(value)
-
   def visit_transform(self, applied_ptransform):
-    inputs = applied_ptransform.inputs
+    inputs = list(applied_ptransform.inputs)
     if inputs:
       for input_value in inputs:
         if isinstance(input_value, pvalue.PBegin):
@@ -57,3 +52,5 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
       self.root_transforms.add(applied_ptransform)
     self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
     self._num_transforms += 1
+    for side_input in applied_ptransform.side_inputs:
+      self.views.append(side_input)

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 73b897f..eb8b14b 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -102,10 +102,10 @@ class 
ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
     root_transforms = sorted(
         [t.transform for t in self.visitor.root_transforms])
     self.assertEqual(root_transforms, sorted([root_create]))
-    self.assertEqual(len(self.visitor.step_names), 4)
+    self.assertEqual(len(self.visitor.step_names), 3)
     self.assertEqual(len(self.visitor.views), 1)
     self.assertTrue(isinstance(self.visitor.views[0],
-                               pvalue.ListPCollectionView))
+                               pvalue.AsList))
 
   def test_co_group_by_key(self):
     emails = self.pipeline | 'email' >> Create([('joe', 'j...@example.com')])

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 7a7f318..8114104 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -50,12 +50,13 @@ class _SideInputView(object):
   def __init__(self, view):
     self._view = view
     self.callable_queue = collections.deque()
+    self.elements = []
     self.value = None
     self.has_result = False
 
 
 class _SideInputsContainer(object):
-  """An in-process container for PCollectionViews.
+  """An in-process container for side inputs.
 
   It provides methods for blocking until a side-input is available and writing
   to a side input.
@@ -67,21 +68,28 @@ class _SideInputsContainer(object):
     for view in views:
       self._views[view] = _SideInputView(view)
 
-  def get_value_or_schedule_after_output(self, pcollection_view, task):
+  def get_value_or_schedule_after_output(self, side_input, task):
     with self._lock:
-      view = self._views[pcollection_view]
+      view = self._views[side_input]
       if not view.has_result:
         view.callable_queue.append(task)
         task.blocked = True
       return (view.has_result, view.value)
 
-  def set_value_and_get_callables(self, pcollection_view, values):
+  def add_values(self, side_input, values):
     with self._lock:
-      view = self._views[pcollection_view]
+      view = self._views[side_input]
+      assert not view.has_result
+      view.elements.extend(values)
+
+  def finalize_value_and_get_tasks(self, side_input):
+    with self._lock:
+      view = self._views[side_input]
       assert not view.has_result
       assert view.value is None
       assert view.callable_queue is not None
-      view.value = self._pvalue_to_value(pcollection_view, values)
+      view.value = self._pvalue_to_value(side_input, view.elements)
+      view.elements = None
       result = tuple(view.callable_queue)
       for task in result:
         task.blocked = False
@@ -90,10 +98,10 @@ class _SideInputsContainer(object):
       return result
 
   def _pvalue_to_value(self, view, values):
-    """Given a PCollectionView, returns the associated value in requested form.
+    """Given a side input view, returns the associated value in requested form.
 
     Args:
-      view: PCollectionView for the requested side input.
+      view: SideInput for the requested side input.
       values: Iterable values associated with the side input.
 
     Returns:
@@ -115,9 +123,9 @@ class EvaluationContext(object):
   EvaluationContext contains shared state for an execution of the
   DirectRunner that can be used while evaluating a PTransform. This
   consists of views into underlying state and watermark implementations, access
-  to read and write PCollectionViews, and constructing counter sets and
+  to read and write side inputs, and constructing counter sets and
   execution contexts. This includes executing callbacks asynchronously when
-  state changes to the appropriate point (e.g. when a PCollectionView is
+  state changes to the appropriate point (e.g. when a side input is
   requested and known to be empty).
 
   EvaluationContext also handles results by committing finalizing
@@ -134,6 +142,9 @@ class EvaluationContext(object):
     self._value_to_consumers = value_to_consumers
     self._step_names = step_names
     self.views = views
+    self._pcollection_to_views = collections.defaultdict(list)
+    for view in views:
+      self._pcollection_to_views[view.pvalue].append(view)
 
     # AppliedPTransform -> Evaluator specific state objects
     self._application_state_interals = {}
@@ -198,17 +209,20 @@ class EvaluationContext(object):
 
       # If the result is for a view, update side inputs container.
       if (result.output_bundles
-          and result.output_bundles[0].pcollection in self.views):
-        if committed_bundles:
-          assert len(committed_bundles) == 1
-          # side_input must be materialized.
-          side_input_result = committed_bundles[0].get_elements_iterable(
-              make_copy=True)
-        else:
-          side_input_result = []
-        tasks = self._side_inputs_container.set_value_and_get_callables(
-            result.output_bundles[0].pcollection, side_input_result)
-        self._pending_unblocked_tasks.extend(tasks)
+          and result.output_bundles[0].pcollection
+          in self._pcollection_to_views):
+        for view in self._pcollection_to_views[
+            result.output_bundles[0].pcollection]:
+          for committed_bundle in committed_bundles:
+            # side_input must be materialized.
+            self._side_inputs_container.add_values(
+                view,
+                committed_bundle.get_elements_iterable(make_copy=True))
+          if (self.get_execution_context(result.transform)
+              .watermarks.input_watermark
+              == WatermarkManager.WATERMARK_POS_INF):
+            self._pending_unblocked_tasks.extend(
+                self._side_inputs_container.finalize_value_and_get_tasks(view))
 
       if result.counters:
         for counter in result.counters:
@@ -277,7 +291,7 @@ class EvaluationContext(object):
     tw = self._watermark_manager.get_watermarks(transform)
     return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
 
-  def get_value_or_schedule_after_output(self, pcollection_view, task):
+  def get_value_or_schedule_after_output(self, side_input, task):
     assert isinstance(task, TransformExecutor)
     return self._side_inputs_container.get_value_or_schedule_after_output(
-        pcollection_view, task)
+        side_input, task)

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 27b6f2f..ce6356c 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -76,6 +76,7 @@ class ExecutorService(object):
         return None
 
     def run(self):
+
       while not self.shutdown_requested:
         task = self._get_task_or_none()
         if task:
@@ -249,9 +250,9 @@ class _TimerCompletionCallback(_CompletionCallback):
 class TransformExecutor(ExecutorService.CallableTask):
   """TransformExecutor will evaluate a bundle using an applied ptransform.
 
-  A CallableTask responsible for constructing a TransformEvaluator 
andevaluating
-  it on some bundle of input, and registering the result using the completion
-  callback.
+  A CallableTask responsible for constructing a TransformEvaluator and
+  evaluating it on some bundle of input, and registering the result using the
+  completion callback.
   """
 
   def __init__(self, transform_evaluator_registry, evaluation_context,

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f9a0692..0c35d99 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -31,7 +31,6 @@ from apache_beam.runners.direct.watermark_manager import 
WatermarkManager
 from apache_beam.runners.direct.transform_result import TransformResult
 from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # 
pylint: disable=protected-access
 from apache_beam.transforms import core
-from apache_beam.transforms import sideinputs
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
@@ -54,7 +53,6 @@ class TransformEvaluatorRegistry(object):
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
-        sideinputs.CreatePCollectionView: _CreatePCollectionViewEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
     }
 
@@ -100,8 +98,7 @@ class TransformEvaluatorRegistry(object):
       True if executor should execute applied_ptransform serially.
     """
     return isinstance(applied_ptransform.transform,
-                      (core.GroupByKeyOnly, sideinputs.CreatePCollectionView,
-                       _NativeWrite))
+                      (core.GroupByKeyOnly, _NativeWrite))
 
 
 class _TransformEvaluator(object):
@@ -444,52 +441,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
         self._applied_ptransform, bundles, state, None, None, hold)
 
 
-class _CreatePCollectionViewEvaluator(_TransformEvaluator):
-  """TransformEvaluator for CreatePCollectionView transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    super(_CreatePCollectionViewEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-  @property
-  def _is_final_bundle(self):
-    return (self._execution_context.watermarks.input_watermark
-            == WatermarkManager.WATERMARK_POS_INF)
-
-  def start_bundle(self):
-    # state: [values]
-    self.state = (self._execution_context.existing_state
-                  if self._execution_context.existing_state else [])
-
-    assert len(self._outputs) == 1
-    self.output_pcollection = list(self._outputs)[0]
-
-  def process_element(self, element):
-    self.state.append(element)
-
-  def finish_bundle(self):
-    if self._is_final_bundle:
-      bundle = self._evaluation_context.create_bundle(self.output_pcollection)
-
-      view_result = self.state
-      for result in view_result:
-        bundle.output(result)
-
-      bundles = [bundle]
-      state = None
-      hold = WatermarkManager.WATERMARK_POS_INF
-    else:
-      bundles = []
-      state = self.state
-      hold = WatermarkManager.WATERMARK_NEG_INF
-
-    return TransformResult(
-        self._applied_ptransform, bundles, state, None, None, hold)
-
-
 class _NativeWriteEvaluator(_TransformEvaluator):
   """TransformEvaluator for _NativeWrite transform."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 7a52828..88fdec8 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -879,7 +879,7 @@ class CombineGlobally(PTransform):
           else CombineFn.from_callable(self.fn))
       default_value = combine_fn.apply([], *self.args, **self.kwargs)
     else:
-      default_value = pvalue._SINGLETON_NO_DEFAULT  # pylint: 
disable=protected-access
+      default_value = pvalue.AsSingleton._NO_DEFAULT  # pylint: 
disable=protected-access
     view = pvalue.AsSingleton(combined, default_value=default_value)
     if self.as_view:
       return view

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index aca5822..93d751d 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -463,7 +463,7 @@ class PTransformWithSideInputs(PTransform):
           'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the 
'
           'PCollection is to be used.')
     self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
-        args, kwargs, pvalue.PCollectionView)
+        args, kwargs, pvalue.AsSideInput)
     self.raw_side_inputs = args, kwargs
 
     # Prevent name collisions with fns of the form '<function <lambda> at ...>'
@@ -519,7 +519,7 @@ class PTransformWithSideInputs(PTransform):
       args, kwargs = self.raw_side_inputs
 
       def element_type(side_input):
-        if isinstance(side_input, pvalue.PCollectionView):
+        if isinstance(side_input, pvalue.AsSideInput):
           return side_input.element_type
         else:
           return instance_to_type(side_input)

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py 
b/sdks/python/apache_beam/transforms/sideinputs.py
index 46731bf..1de7bac 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -24,140 +24,8 @@ AsSingleton, AsIter, AsList and AsDict in 
apache_beam.pvalue.
 
 from __future__ import absolute_import
 
-from apache_beam import pvalue
-from apache_beam import typehints
-from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms import window
 
-# Type variables
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-
-
-class CreatePCollectionView(PTransform):
-  """Transform to materialize a given PCollectionView in the pipeline.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers.
-  """
-
-  def __init__(self, view):
-    self.view = view
-    super(CreatePCollectionView, self).__init__()
-
-  def infer_output_type(self, input_type):
-    # TODO(ccy): Figure out if we want to create a new type of type hint, i.e.,
-    # typehints.View[...].
-    return input_type
-
-  def expand(self, pcoll):
-    return self.view
-
-
-class ViewAsSingleton(PTransform):
-  """Transform to view PCollection as a singleton PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsSingleton(...) instead.
-  """
-
-  def __init__(self, has_default, default_value, label=None):
-    if label:
-      label = 'ViewAsSingleton(%s)' % label
-    super(ViewAsSingleton, self).__init__(label=label)
-    self.has_default = has_default
-    self.default_value = default_value
-
-  def expand(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = input_type
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.SingletonPCollectionView(
-                    pcoll.pipeline, self.has_default, self.default_value,
-                    default_window_mapping_fn(pcoll.windowing.windowfn)))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-
-class ViewAsIterable(PTransform):
-  """Transform to view PCollection as an iterable PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsIter(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsIterable(%s)' % label
-    super(ViewAsIterable, self).__init__(label=label)
-
-  def expand(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = typehints.Iterable[input_type]
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.IterablePCollectionView(
-                    pcoll.pipeline,
-                    default_window_mapping_fn(pcoll.windowing.windowfn)))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-
-class ViewAsList(PTransform):
-  """Transform to view PCollection as a list PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsList(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsList(%s)' % label
-    super(ViewAsList, self).__init__(label=label)
-
-  def expand(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    output_type = typehints.List[input_type]
-    return (pcoll
-            | CreatePCollectionView(pvalue.ListPCollectionView(
-                pcoll.pipeline,
-                default_window_mapping_fn(pcoll.windowing.windowfn)))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
-
-@typehints.with_input_types(typehints.Tuple[K, V])
-@typehints.with_output_types(typehints.Dict[K, V])
-class ViewAsDict(PTransform):
-  """Transform to view PCollection as a dict PCollectionView.
-
-  Important: this transform is an implementation detail and should not be used
-  directly by pipeline writers. Use pvalue.AsDict(...) instead.
-  """
-
-  def __init__(self, label=None):
-    if label:
-      label = 'ViewAsDict(%s)' % label
-    super(ViewAsDict, self).__init__(label=label)
-
-  def expand(self, pcoll):
-    self._check_pcollection(pcoll)
-    input_type = pcoll.element_type
-    key_type, value_type = (
-        typehints.trivial_inference.key_value_types(input_type))
-    output_type = typehints.Dict[key_type, value_type]
-    return (pcoll
-            | CreatePCollectionView(
-                pvalue.DictPCollectionView(
-                    pcoll.pipeline,
-                    default_window_mapping_fn(pcoll.windowing.windowfn)))
-            .with_input_types(input_type)
-            .with_output_types(output_type))
-
 
 # Top-level function so we can identify it later.
 def _global_window_mapping_fn(w, global_window=window.GlobalWindow()):

http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 9278f4b..53669de 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -186,8 +186,8 @@ class SideInputsTest(unittest.TestCase):
     main_input = pipeline | 'main input' >> beam.Create([1])
     side_list = pipeline | 'side list' >> beam.Create(a_list)
     side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs)
-    results = main_input | 'concatenate' >> beam.FlatMap(
-        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+    results = main_input | 'concatenate' >> beam.Map(
+        lambda x, the_list, the_dict: [x, the_list, the_dict],
         beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs))
 
     def  matcher(expected_elem, expected_list, expected_pairs):
@@ -205,13 +205,13 @@ class SideInputsTest(unittest.TestCase):
   def test_as_singleton_without_unique_labels(self):
     # This should succeed as calling beam.pvalue.AsSingleton on the same
     # PCollection twice with the same defaults will return the same
-    # PCollectionView.
+    # view.
     a_list = [2]
     pipeline = self.create_pipeline()
     main_input = pipeline | 'main input' >> beam.Create([1])
     side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, s1, s2: [[x, s1, s2]],
+    results = main_input | beam.Map(
+        lambda x, s1, s2: [x, s1, s2],
         beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list))
 
     def  matcher(expected_elem, expected_singleton):
@@ -226,34 +226,15 @@ class SideInputsTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_without_unique_labels(self):
-    # This should fail as beam.pvalue.AsSingleton with distinct default values
-    # should beam.Create distinct PCollectionViews with the same full_label.
-    a_list = [2]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-
-    with self.assertRaises(RuntimeError) as e:
-      _ = main_input | beam.FlatMap(
-          lambda x, s1, s2: [[x, s1, s2]],
-          beam.pvalue.AsSingleton(side_list),
-          beam.pvalue.AsSingleton(side_list, default_value=3))
-    self.assertTrue(
-        e.exception.message.startswith(
-            'Transform "ViewAsSingleton(side list.None)" does not have a '
-            'stable unique label.'))
-
-  @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_with_unique_labels(self):
+  def test_as_singleton_with_different_defaults(self):
     a_list = []
     pipeline = self.create_pipeline()
     main_input = pipeline | 'main input' >> beam.Create([1])
     side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, s1, s2: [[x, s1, s2]],
-        beam.pvalue.AsSingleton(side_list, default_value=2, label='si1'),
-        beam.pvalue.AsSingleton(side_list, default_value=3, label='si2'))
+    results = main_input | beam.Map(
+        lambda x, s1, s2: [x, s1, s2],
+        beam.pvalue.AsSingleton(side_list, default_value=2),
+        beam.pvalue.AsSingleton(side_list, default_value=3))
 
     def  matcher(expected_elem, expected_singleton1, expected_singleton2):
       def match(actual):
@@ -267,15 +248,15 @@ class SideInputsTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
-  def test_as_list_without_unique_labels(self):
+  def test_as_list_twice(self):
     # This should succeed as calling beam.pvalue.AsList on the same
-    # PCollection twice will return the same PCollectionView.
+    # PCollection twice will return the same view.
     a_list = [1, 2, 3]
     pipeline = self.create_pipeline()
     main_input = pipeline | 'main input' >> beam.Create([1])
     side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
+    results = main_input | beam.Map(
+        lambda x, ls1, ls2: [x, ls1, ls2],
         beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list))
 
     def  matcher(expected_elem, expected_list):
@@ -290,37 +271,15 @@ class SideInputsTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
-  def test_as_list_with_unique_labels(self):
-    a_list = [1, 2, 3]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
-        beam.pvalue.AsList(side_list),
-        beam.pvalue.AsList(side_list, label='label'))
-
-    def  matcher(expected_elem, expected_list):
-      def match(actual):
-        [[actual_elem, actual_list1, actual_list2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list1)
-        equal_to(expected_list)(actual_list2)
-      return match
-
-    assert_that(results, matcher(1, [1, 2, 3]))
-    pipeline.run()
-
-  @attr('ValidatesRunner')
-  def test_as_dict_with_unique_labels(self):
+  def test_as_dict_twice(self):
     some_kvs = [('a', 1), ('b', 2)]
     pipeline = self.create_pipeline()
     main_input = pipeline | 'main input' >> beam.Create([1])
     side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
-    results = main_input | beam.FlatMap(
-        lambda x, dct1, dct2: [[x, dct1, dct2]],
+    results = main_input | beam.Map(
+        lambda x, dct1, dct2: [x, dct1, dct2],
         beam.pvalue.AsDict(side_kvs),
-        beam.pvalue.AsDict(side_kvs, label='label'))
+        beam.pvalue.AsDict(side_kvs))
 
     def  matcher(expected_elem, expected_kvs):
       def match(actual):
@@ -333,6 +292,20 @@ class SideInputsTest(unittest.TestCase):
     assert_that(results, matcher(1, some_kvs))
     pipeline.run()
 
+  @attr('ValidatesRunner')
+  def test_flattened_side_input(self):
+    pipeline = self.create_pipeline()
+    main_input = pipeline | 'main input' >> beam.Create([None])
+    side_input = (
+        pipeline | 'side1' >> beam.Create(['a']),
+        pipeline | 'side2' >> beam.Create(['b'])) | beam.Flatten()
+    results = main_input | beam.FlatMap(
+        lambda _, ab: ab,
+        beam.pvalue.AsList(side_input))
+
+    assert_that(results, equal_to(['a', 'b']))
+    pipeline.run()
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)

Reply via email to