claudevdm commented on code in PR #35601:
URL: https://github.com/apache/beam/pull/35601#discussion_r2214501933


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2927,6 +2927,22 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
     A PObject holding the result of the combine operation.
   """
+  def __new__(cls, *args, **kwargs):
+    def has_side_inputs():
+      return (
+          any(isinstance(arg, pvalue.AsSideInput) for arg in args) or
+          any(isinstance(arg, pvalue.AsSideInput) for arg in kwargs.values()))
+
+    if has_side_inputs():
+      # If the CombineFn has deferred side inputs, the python SDK
+      # doesn't implement it.
+      # Use a ParDo-based CombinePerKey instead.
+      from apache_beam.transforms.combiners import \
+        LiftedCombinePerKey
+      combine_fn, *args = args
+      return LiftedCombinePerKey(combine_fn, args, kwargs)

Review Comment:
   Maybe we should log a warning here that runner implemented CBK has been 
replaced due to side inputs and may affect performance? Users might not be 
aware this is happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to