+1 to looking at composite transforms. You could even have a composite
transform that takes another transform as one of its construction arguments
and whose expand method does pre- and post-processing to the inputs/outputs
before/after applying the transform in question. (You could even implement
this as a Python decorator if you really wanted, either decorating the
expand method itself or the full class...)

One of the difficulties is that for a general transform there isn't
necessarily a 1:N relationship between outputs and inputs as one has for a
DoFn (especially if there is any aggregation involved). There are, however,
two partial solutions that might help.

(1) You can do a CombineGlobally with a CombineFn (Like Sample) that
returns at most N elements. You could do this with a CombinePerKey if you
can come up with a reasonable key (e.g. the id of your input elements) that
the limit should be a applied to. Note that this may cause a lot of data to
be shuffled (though due to combiner lifting, no more than N per bundle).

(2) You could have a DoFn that limits to N per bundle by initializing a
counter in its start_bundle and passing elements through until the counter
reaches a threshold. (Again, one could do this per id if one is available.)
It wouldn't stop production of the elements, but if things get fused it
would still likely be fairly cheap.

Both of these could be prepended to the problematic consuming PTransform as
well.

- Robert



On Fri, Sep 15, 2023 at 8:13 AM Joey Tran <joey.t...@schrodinger.com> wrote:

> I'm aware of composite transforms and of the distributed nature of
> PTransforms. I'm not suggesting limiting the entire set and my example was
> more illustrative than the actual use case.
>
> My actual use case is basically: I have multiple PTransforms, and let's
> say most of them average ~100 generated outputs for a single input. Most of
> these PTransforms will occasionally run into an input though that might
> output maybe 1M outputs. This can cause issues if for example there are
> transforms that follow it that require a lot of compute per input.
>
> The simplest way to deal with this is to modify the `DoFn`s in our
> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
> our transforms, but it'd be much cleaner if we could lift up this limiting
> logic out of the application logic and have some generic wrapper that
> extends our transforms.
>
> Thanks for the discussion!
>
> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
> aromanenko....@gmail.com> wrote:
>
>> I don’t think it’s possible to extend in a way that you are asking (like,
>> Java classes “*extend*"). Though, you can create your own composite
>> PTransform that will incorporate one or several others inside
>> *“expand()”* method. Actually, most of the Beam native PTransforms are
>> composite transforms. Please, take a look on doc and examples [1]
>>
>> Regarding your example, please, be aware that all PTransforms are
>> supposed to be executed in distributed environment and the order of records
>> is not guaranteed. So, limiting the whole output by fixed number of records
>> can be challenging - you’d need to make sure that it will be processed on
>> only one worker, that means that you’d need to shuffle all your records by
>> the same key and probably sort the records in way that you need.
>>
>> Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for that?
>> [2]
>>
>> If it doesn’t work for you, could you provide more details of your use
>> case? Then we probably can propose the more suitable solutions for that.
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>> [2]
>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
>>
>> —
>> Alexey
>>
>> On 15 Sep 2023, at 14:22, Joey Tran <joey.t...@schrodinger.com> wrote:
>>
>> Is there a way to extend already defined PTransforms? My question is
>> probably better illustrated with an example. Let's say I have a PTransform
>> that generates a very variable number of outputs. I'd like to "wrap" that
>> PTransform such that if it ever creates more than say 1,000 outputs, then I
>> just take the first 1,000 outputs without generating the rest of the
>> outputs.
>>
>> It'd be trivial if I have access to the DoFn, but what if the PTransform
>> in question doesn't expose the `DoFn`?
>>
>>
>>

Reply via email to