Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
sounds like the likely solution, was just hoping there'd be one that could
wrap at the PTransform level but I realize now the PTransform abstraction
is too general as you mentioned to do something like that.

(2) will be likely what we do, though now I'm wondering if it might be
possible to create a ParDo wrapper that can take a ParDo, extract it's
dofn, wrap it, and return a new ParDo

On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <> wrote:

> +1 to looking at composite transforms. You could even have a composite
> transform that takes another transform as one of its construction arguments
> and whose expand method does pre- and post-processing to the inputs/outputs
> before/after applying the transform in question. (You could even implement
> this as a Python decorator if you really wanted, either decorating the
> expand method itself or the full class...)
> One of the difficulties is that for a general transform there isn't
> necessarily a 1:N relationship between outputs and inputs as one has for a
> DoFn (especially if there is any aggregation involved). There are, however,
> two partial solutions that might help.
> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
> returns at most N elements. You could do this with a CombinePerKey if you
> can come up with a reasonable key (e.g. the id of your input elements) that
> the limit should be a applied to. Note that this may cause a lot of data to
> be shuffled (though due to combiner lifting, no more than N per bundle).
> (2) You could have a DoFn that limits to N per bundle by initializing a
> counter in its start_bundle and passing elements through until the counter
> reaches a threshold. (Again, one could do this per id if one is available.)
> It wouldn't stop production of the elements, but if things get fused it
> would still likely be fairly cheap.
> Both of these could be prepended to the problematic consuming PTransform
> as well.
> - Robert
> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran <>
> wrote:
>> I'm aware of composite transforms and of the distributed nature of
>> PTransforms. I'm not suggesting limiting the entire set and my example was
>> more illustrative than the actual use case.
>> My actual use case is basically: I have multiple PTransforms, and let's
>> say most of them average ~100 generated outputs for a single input. Most of
>> these PTransforms will occasionally run into an input though that might
>> output maybe 1M outputs. This can cause issues if for example there are
>> transforms that follow it that require a lot of compute per input.
>> The simplest way to deal with this is to modify the `DoFn`s in our
>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>> our transforms, but it'd be much cleaner if we could lift up this limiting
>> logic out of the application logic and have some generic wrapper that
>> extends our transforms.
>> Thanks for the discussion!
>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>> wrote:
>>> I don’t think it’s possible to extend in a way that you are asking
>>> (like, Java classes “*extend*"). Though, you can create your own
>>> composite PTransform that will incorporate one or several others inside
>>> *“expand()”* method. Actually, most of the Beam native PTransforms are
>>> composite transforms. Please, take a look on doc and examples [1]
>>> Regarding your example, please, be aware that all PTransforms are
>>> supposed to be executed in distributed environment and the order of records
>>> is not guaranteed. So, limiting the whole output by fixed number of records
>>> can be challenging - you’d need to make sure that it will be processed on
>>> only one worker, that means that you’d need to shuffle all your records by
>>> the same key and probably sort the records in way that you need.
>>> Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for
>>> that? [2]
>>> If it doesn’t work for you, could you provide more details of your use
>>> case? Then we probably can propose the more suitable solutions for that.
>>> [1]
>>> [2]
>>> —
>>> Alexey
>>> On 15 Sep 2023, at 14:22, Joey Tran <> wrote:
>>> Is there a way to extend already defined PTransforms? My question is
>>> probably better illustrated with an example. Let's say I have a PTransform
>>> that generates a very variable number of outputs. I'd like to "wrap" that
>>> PTransform such that if it ever creates more than say 1,000 outputs, then I
>>> just take the first 1,000 outputs without generating the rest of the
>>> outputs.
>>> It'd be trivial if I have access to the DoFn, but what if the PTransform
>>> in question doesn't expose the `DoFn`?

Reply via email to