[ 
https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269684#comment-16269684
 ] 

ASF GitHub Bot commented on BEAM-1872:
--------------------------------------

udim commented on a change in pull request #4040: [BEAM-1872] Add 
IdentityWindowFn for use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653671
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/util.py
 ##########
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
           self._batch_size_estimator))
     else:
       return pcoll | 
ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+    """Create a new WindowFn with compatible coder.
+    To be applied to PCollections with windows that are compatible with the
+    given coder.
+
+    Arguments:
+      coder: coders.Coder object to be used on windows.
+    """
+    super(IdentityWindowFn, self).__init__()
+    if coder is None:
+      raise ValueError('coder should not be None')
+    self._coder = coder
+
+  def assign(self, assign_context):
+    if assign_context.window is None:
+      raise ValueError(
+          'assign_context.window should not be None. '
+          'This might be due to a DoFn returning a TimestampedValue.')
+    return [assign_context.window]
+
+  def get_window_coder(self):
+    return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+    return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+    pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+    # doesn't merge
+    pass
+
+  def should_fire(self, watermark, window, context):
+    return True
+
+  def on_fire(self, watermark, window, context):
+    return True
+
+  def reset(self, window, context):
+    pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+    return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+    # TODO: add TriggerForEveryElement to proto
+    return beam_runner_api_pb2.Trigger(
+        element_count=beam_runner_api_pb2.Trigger.ElementCount(
+            element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+    class ExpandIterableDoFn(DoFn):
+      def process(self, element):
+        return [(element[0], value) for value in element[1]]
+
+    class ReifyTimestampsIn(DoFn):
+      def process(self, element, timestamp=DoFn.TimestampParam):
+        if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+                timestamp == DoFn.TimestampParam):
+          raise ValueError('timestamp was unset for element: %r' % element)
+        yield element[0], TimestampedValue(element[1], timestamp)
+
+    class ReifyTimestampsExtract(DoFn):
+      def process(self, element, window=DoFn.WindowParam):
+        yield windowed_value.WindowedValue(
+            (element[0], element[1].value), element[1].timestamp, [window])
+
+    # TODO: is it safe to reapply this value?
+    windowing_saved = pcoll.windowing
+    # TODO: add .with_input_types, .with_output_types to PTransforms below?
+    pcoll_intermediate = (pcoll
+            | 'ReifyTimestampsIn' >> ParDo(ReifyTimestampsIn())
+            | 'IdentityWindow' >> WindowInto(
+                IdentityWindowFn(windowing_saved.windowfn.get_window_coder()),
+                trigger=TriggerForEveryElement(),
+                accumulation_mode=AccumulationMode.DISCARDING,
+                # TODO: timestamp_combiner=
+                )
+            | 'GroupByKey' >> GroupByKey()
+            | 'ExpandIterable' >> ParDo(ExpandIterableDoFn()))
+    pcoll_intermediate.windowing = windowing_saved
 
 Review comment:
   <!--thread_id:cc_151220547_t; 
commit:4fa4caa4b4991fe994ce3938f37be975421c6761; resolved:1-->
   <!--section:context-quote-->
   > **robertwb** wrote:
   > Nit: I'd probably apply assign windowing as the very last thing rather 
than on this intermediate.
   
   <!--section:body-->
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> ---------------------------------------------------------------------
>
>                 Key: BEAM-1872
>                 URL: https://issues.apache.org/jira/browse/BEAM-1872
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Udi Meiri
>              Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to