claudevdm commented on code in PR #35601:
URL: https://github.com/apache/beam/pull/35601#discussion_r2210788701


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2917,6 +2917,22 @@ class CombinePerKey(PTransformWithSideInputs):
   Returns:
     A PObject holding the result of the combine operation.
   """
+  def __new__(cls, *args, **kwargs):
+    def has_side_inputs():
+      return (
+          any(isinstance(arg, pvalue.AsSideInput) for arg in args) or
+          any(isinstance(arg, pvalue.AsSideInput) for arg in kwargs.values()))
+
+    if has_side_inputs():
+      # If the CombineFn has deferred side inputs, the python SDK
+      # doesn't implement it.
+      # Use a ParDo-based CombinePerKey instead.
+      from apache_beam.runners.direct.helper_transforms import \

Review Comment:
   Is this meant to only apply to pipelines running on direct runner?
   
   It seems strange to import transforms from direct runner into 
transforms.core.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to