[ 
https://issues.apache.org/jira/browse/BEAM-6067?focusedWorklogId=172700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172700
 ]

ASF GitHub Bot logged work on BEAM-6067:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Dec/18 17:08
            Start Date: 06/Dec/18 17:08
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #7081: [BEAM-6067] In 
Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard 
CloudObject coders
URL: https://github.com/apache/beam/pull/7081
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index f5c90a8ec99e..f2a4b2ee724a 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -190,7 +190,7 @@ def _get_component_coders(self):
     # refined in user-defined Coders.
     return []
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     """For internal use only; no backwards-compatibility guarantees.
 
     Returns Google Cloud Dataflow API description of this coder."""
@@ -201,12 +201,17 @@ def as_cloud_object(self):
         # We pass coders in the form "<coder_name>$<pickled_data>" to make the
         # job description JSON more readable.  Data before the $ is ignored by
         # the worker.
-        '@type': serialize_coder(self),
-        'component_encodings': list(
-            component.as_cloud_object()
+        '@type':
+            serialize_coder(self),
+        'component_encodings': [
+            component.as_cloud_object(coders_context)
             for component in self._get_component_coders()
-        ),
+        ],
     }
+
+    if coders_context:
+      value['pipeline_proto_coder_id'] = coders_context.get_id(self)
+
     return value
 
   def __repr__(self):
@@ -370,7 +375,7 @@ def _create_impl(self):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:bytes',
     }
@@ -394,7 +399,7 @@ def _create_impl(self):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:varint',
     }
@@ -516,8 +521,8 @@ def is_deterministic(self):
     # GroupByKey operations.
     return False
 
-  def as_cloud_object(self, is_pair_like=True):
-    value = super(_PickleCoderBase, self).as_cloud_object()
+  def as_cloud_object(self, coders_context=None, is_pair_like=True):
+    value = super(_PickleCoderBase, self).as_cloud_object(coders_context)
     # We currently use this coder in places where we cannot infer the coder to
     # use for the value type in a more granular way.  In places where the
     # service expects a pair, it checks for the "is_pair_like" key, in which
@@ -525,8 +530,8 @@ def as_cloud_object(self, is_pair_like=True):
     if is_pair_like:
       value['is_pair_like'] = True
       value['component_encodings'] = [
-          self.as_cloud_object(is_pair_like=False),
-          self.as_cloud_object(is_pair_like=False)
+          self.as_cloud_object(coders_context, is_pair_like=False),
+          self.as_cloud_object(coders_context, is_pair_like=False)
       ]
 
     return value
@@ -615,8 +620,8 @@ def as_deterministic_coder(self, step_label, 
error_message=None):
     else:
       return DeterministicFastPrimitivesCoder(self, step_label)
 
-  def as_cloud_object(self, is_pair_like=True):
-    value = super(FastCoder, self).as_cloud_object()
+  def as_cloud_object(self, coders_context=None, is_pair_like=True):
+    value = super(FastCoder, self).as_cloud_object(coders_context)
     # We currently use this coder in places where we cannot infer the coder to
     # use for the value type in a more granular way.  In places where the
     # service expects a pair, it checks for the "is_pair_like" key, in which
@@ -624,8 +629,8 @@ def as_cloud_object(self, is_pair_like=True):
     if is_pair_like:
       value['is_pair_like'] = True
       value['component_encodings'] = [
-          self.as_cloud_object(is_pair_like=False),
-          self.as_cloud_object(is_pair_like=False)
+          self.as_cloud_object(coders_context, is_pair_like=False),
+          self.as_cloud_object(coders_context, is_pair_like=False)
       ]
 
     return value
@@ -744,18 +749,20 @@ def as_deterministic_coder(self, step_label, 
error_message=None):
   def from_type_hint(typehint, registry):
     return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     if self.is_kv_coder():
       return {
-          '@type': 'kind:pair',
-          'is_pair_like': True,
-          'component_encodings': list(
-              component.as_cloud_object()
+          '@type':
+              'kind:pair',
+          'is_pair_like':
+              True,
+          'component_encodings': [
+              component.as_cloud_object(coders_context)
               for component in self._get_component_coders()
-          ),
+          ],
       }
 
-    return super(TupleCoder, self).as_cloud_object()
+    return super(TupleCoder, self).as_cloud_object(coders_context)
 
   def _get_component_coders(self):
     return self.coders()
@@ -853,11 +860,15 @@ def as_deterministic_coder(self, step_label, 
error_message=None):
       return IterableCoder(
           self._elem_coder.as_deterministic_coder(step_label, error_message))
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:stream',
-        'is_stream_like': True,
-        'component_encodings': [self._elem_coder.as_cloud_object()],
+        '@type':
+            'kind:stream',
+        'is_stream_like':
+            True,
+        'component_encodings': [
+            self._elem_coder.as_cloud_object(coders_context)
+        ],
     }
 
   def value_coder(self):
@@ -891,7 +902,7 @@ def __init__(self):
     from apache_beam.transforms import window
     super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:global_window',
     }
@@ -910,7 +921,7 @@ def _create_impl(self):
   def is_deterministic(self):
     return True
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
         '@type': 'kind:interval_window',
     }
@@ -947,13 +958,16 @@ def is_deterministic(self):
                                               self.timestamp_coder,
                                               self.window_coder])
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:windowed_value',
-        'is_wrapper': True,
+        '@type':
+            'kind:windowed_value',
+        'is_wrapper':
+            True,
         'component_encodings': [
-            component.as_cloud_object()
-            for component in self._get_component_coders()],
+            component.as_cloud_object(coders_context)
+            for component in self._get_component_coders()
+        ],
     }
 
   def _get_component_coders(self):
@@ -1007,10 +1021,13 @@ def estimate_size(self, value):
   def value_coder(self):
     return self._value_coder
 
-  def as_cloud_object(self):
+  def as_cloud_object(self, coders_context=None):
     return {
-        '@type': 'kind:length_prefix',
-        'component_encodings': [self._value_coder.as_cloud_object()],
+        '@type':
+            'kind:length_prefix',
+        'component_encodings': [
+            self._value_coder.as_cloud_object(coders_context)
+        ],
     }
 
   def _get_component_coders(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 88b03d47a304..f53808d2b037 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -398,10 +398,10 @@ def run_pipeline(self, pipeline, options):
     result.metric_results = self._metrics
     return result
 
-  def _get_typehint_based_encoding(self, typehint, window_coder):
+  def _get_typehint_based_encoding(self, typehint, window_coder, use_fnapi):
     """Returns an encoding based on a typehint object."""
-    return self._get_cloud_encoding(self._get_coder(typehint,
-                                                    window_coder=window_coder))
+    return self._get_cloud_encoding(
+        self._get_coder(typehint, window_coder=window_coder), use_fnapi)
 
   @staticmethod
   def _get_coder(typehint, window_coder):
@@ -412,12 +412,13 @@ def _get_coder(typehint, window_coder):
           window_coder=window_coder)
     return coders.registry.get_coder(typehint)
 
-  def _get_cloud_encoding(self, coder):
+  def _get_cloud_encoding(self, coder, use_fnapi):
     """Returns an encoding based on a coder object."""
     if not isinstance(coder, coders.Coder):
       raise TypeError('Coder object must inherit from coders.Coder: %s.' %
                       str(coder))
-    return coder.as_cloud_object()
+    return coder.as_cloud_object(self.proto_context
+                                 .coders if use_fnapi else None)
 
   def _get_side_input_encoding(self, input_encoding):
     """Returns an encoding for the output of a view transform.
@@ -454,8 +455,11 @@ def _get_encoded_output_coder(self, transform_node, 
window_value=True):
           transform_node.outputs[None].windowing.windowfn.get_window_coder())
     else:
       window_coder = None
-    return self._get_typehint_based_encoding(
-        element_type, window_coder=window_coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(
+        transform_node.outputs.values()[0].pipeline._options)
+    return self._get_typehint_based_encoding(element_type, window_coder,
+                                             use_fnapi)
 
   def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
     """Creates a Step object and adds it to the cache."""
@@ -753,7 +757,8 @@ def run_CombineValues(self, transform_node, options):
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
     from apache_beam.runners.dataflow.internal import apiclient
-    if apiclient._use_fnapi(options):
+    use_fnapi = apiclient._use_fnapi(options)
+    if use_fnapi:
       # Fnapi pipelines send the transform ID of the CombineValues transform's
       # parent composite because Dataflow expects the ID of a CombinePerKey
       # transform.
@@ -775,7 +780,7 @@ def run_CombineValues(self, transform_node, options):
     # Note that the accumulator must not have a WindowedValue encoding, while
     # the output of this step does in fact have a WindowedValue encoding.
     accumulator_encoding = self._get_cloud_encoding(
-        transform_node.transform.fn.get_accumulator_coder())
+        transform_node.transform.fn.get_accumulator_coder(), use_fnapi)
     output_encoding = self._get_encoded_output_coder(transform_node)
 
     step.encoding = output_encoding
@@ -911,7 +916,9 @@ def run_Read(self, transform_node, options):
         coders.registry.get_coder(transform_node.outputs[None].element_type),
         coders.coders.GlobalWindowCoder())
 
-    step.encoding = self._get_cloud_encoding(coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(options)
+    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
         [{PropertyNames.USER_NAME: (
@@ -993,7 +1000,9 @@ def run__NativeWrite(self, transform_node, options):
     # correct coder.
     coder = coders.WindowedValueCoder(transform.sink.coder,
                                       coders.coders.GlobalWindowCoder())
-    step.encoding = self._get_cloud_encoding(coder)
+    from apache_beam.runners.dataflow.internal import apiclient
+    use_fnapi = apiclient._use_fnapi(options)
+    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 172700)
            Time Spent: 7h 20m  (was: 7h 10m)
    Remaining Estimate: 160h 40m  (was: 160h 50m)

> Dataflow runner should include portable pipeline coder id in CloudObject 
> coder representation
> ---------------------------------------------------------------------------------------------
>
>                 Key: BEAM-6067
>                 URL: https://issues.apache.org/jira/browse/BEAM-6067
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Craig Chambers
>            Assignee: Craig Chambers
>            Priority: Major
>   Original Estimate: 168h
>          Time Spent: 7h 20m
>  Remaining Estimate: 160h 40m
>
> When translating a BeamJava Coder into the DataflowRunner's CloudObject 
> property map, include a property that specifies the id in the Beam model 
> Pipeline coders map corresponding to that Coder.  This will allow the 
> DataflowRunner to reference the corresponding Beam coder in the FnAPI 
> processing bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to