[ 
https://issues.apache.org/jira/browse/BEAM-2927?focusedWorklogId=81897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81897
 ]

ASF GitHub Bot logged work on BEAM-2927:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/18 15:57
            Start Date: 19/Mar/18 15:57
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #4781: 
[BEAM-2927] Python support for portable side inputs over Fn API
URL: https://github.com/apache/beam/pull/4781#discussion_r175249349
 
 

 ##########
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##########
 @@ -211,43 +211,68 @@ def visit_transform(self, transform_node):
         from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly
         if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)):
           pcoll = transform_node.inputs[0]
-          input_type = pcoll.element_type
-          # If input_type is not specified, then treat it as `Any`.
-          if not input_type:
-            input_type = typehints.Any
-
-          def coerce_to_kv_type(element_type):
-            if isinstance(element_type, typehints.TupleHint.TupleConstraint):
-              if len(element_type.tuple_types) == 2:
-                return element_type
-              else:
-                raise ValueError(
-                    "Tuple input to GroupByKey must be have two components. "
-                    "Found %s for %s" % (element_type, pcoll))
-            elif isinstance(input_type, typehints.AnyTypeConstraint):
-              # `Any` type needs to be replaced with a KV[Any, Any] to
-              # force a KV coder as the main output coder for the pcollection
-              # preceding a GroupByKey.
-              return typehints.KV[typehints.Any, typehints.Any]
-            elif isinstance(element_type, typehints.UnionConstraint):
-              union_types = [
-                  coerce_to_kv_type(t) for t in element_type.union_types]
-              return typehints.KV[
-                  typehints.Union[tuple(t.tuple_types[0] for t in 
union_types)],
-                  typehints.Union[tuple(t.tuple_types[1] for t in 
union_types)]]
-            else:
-              # TODO: Possibly handle other valid types.
-              raise ValueError(
-                  "Input to GroupByKey must be of Tuple or Any type. "
-                  "Found %s for %s" % (element_type, pcoll))
-          pcoll.element_type = coerce_to_kv_type(input_type)
+          pcoll.element_type = typehints.coerce_to_kv_type(
+              pcoll.element_type, transform_node.full_label)
           key_type, value_type = pcoll.element_type.tuple_types
           if transform_node.outputs:
             transform_node.outputs[None].element_type = typehints.KV[
                 key_type, typehints.Iterable[value_type]]
 
     return GroupByKeyInputVisitor()
 
+  @staticmethod
+  def side_input_visitor():
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.pipeline import PipelineVisitor
+    from apache_beam.transforms.core import ParDo
+
+    class SideInputVisitor(PipelineVisitor):
+      """Ensures input `PCollection` used as a side inputs have a `KV` type.
+
+      TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+      we could directly replace the coder instead of mutating the element type.
+      """
+      def visit_transform(self, transform_node):
+        if isinstance(transform_node.transform, ParDo):
+          new_side_inputs = []
+          for ix, side_input in enumerate(transform_node.side_inputs):
+            access_pattern = side_input._side_input_data().access_pattern
+            if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
+              # Add a map to ('', value) as Dataflow currently only handles
+              # keyed side inputs.
+              pipeline = side_input.pvalue.pipeline
+              new_side_input = _DataflowIterableSideInput(side_input)
+              new_side_input.pvalue = beam.pvalue.PCollection(
+                  pipeline,
+                  element_type=typehints.KV[
+                      str, side_input.pvalue.element_type])
+              parent = transform_node.parent or pipeline._root_transform()
 
 Review comment:
   That is a good question, but I did run into this case. (Probably a bug, 
filed BEAM-3871 for follow-up.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 81897)
    Time Spent: 1.5h  (was: 1h 20m)

> Python SDK support for portable side input
> ------------------------------------------
>
>                 Key: BEAM-2927
>                 URL: https://issues.apache.org/jira/browse/BEAM-2927
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Henning Rohde
>            Assignee: Robert Bradshaw
>            Priority: Major
>              Labels: portability
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to