claudevdm commented on code in PR #35601: URL: https://github.com/apache/beam/pull/35601#discussion_r2214501933
########## sdks/python/apache_beam/transforms/core.py: ########## @@ -2927,6 +2927,22 @@ class CombinePerKey(PTransformWithSideInputs): Returns: A PObject holding the result of the combine operation. """ + def __new__(cls, *args, **kwargs): + def has_side_inputs(): + return ( + any(isinstance(arg, pvalue.AsSideInput) for arg in args) or + any(isinstance(arg, pvalue.AsSideInput) for arg in kwargs.values())) + + if has_side_inputs(): + # If the CombineFn has deferred side inputs, the python SDK + # doesn't implement it. + # Use a ParDo-based CombinePerKey instead. + from apache_beam.transforms.combiners import \ + LiftedCombinePerKey + combine_fn, *args = args + return LiftedCombinePerKey(combine_fn, args, kwargs) Review Comment: Maybe we should log a warning here that runner implemented CBK has been replaced due to side inputs and may affect performance? Users might not be aware this is happening. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org