In the past it was BoundedReadFromUnboundedSource that is still, iinm, used in 
KafkaIO to limit read by number of records or time. Though, in the same time we 
had a discussion that it should not be used anymore and considered as obsolete 
transform.  

> On 18 Sep 2023, at 09:28, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Do we have a defined way for a PTransform to create bounded PCollection from 
> an unbounded one (a typical example would be LIMIT acting on unbounded 
> input)? AFAIK, we can use SDF to manipulate watermark, but that requires 
> terminating the Pipeline even though there are still upstream running 
> transforms (e.g. sources). I'm not sure if we have a sound definition of when 
> a runner should terminate a Pipeline, so I guess this is runner dependent, 
> right? If I'm not wrong, for example Flink does not terminate Pipeline until 
> there is at least one running operator, so this might require signalling 
> sources from sink (thus introducing some form of cycle).
> 
>  Jan
> 
> On 9/15/23 18:55, Robert Bradshaw via user wrote:
>> On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user <user@beam.apache.org 
>> <mailto:user@beam.apache.org>> wrote:
>>> Creating composite DoFns is tricky today due to how they are implemented 
>>> (via annotated methods).
>> 
>> Note that this depends on the language. This should be really easy to do 
>> from Python. 
>>  
>>> However providing such a method to compose DoFns would be very useful IMO.
>> 
>> +1
>>  
>>> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran <joey.t...@schrodinger.com 
>>> <mailto:joey.t...@schrodinger.com>> wrote:
>>>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2) 
>>>> sounds like the likely solution, was just hoping there'd be one that could 
>>>> wrap at the PTransform level but I realize now the PTransform abstraction 
>>>> is too general as you mentioned to do something like that.
>>>> 
>>>> (2) will be likely what we do, though now I'm wondering if it might be 
>>>> possible to create a ParDo wrapper that can take a ParDo, extract it's 
>>>> dofn, wrap it, and return a new ParDo
>>>> 
>>>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user 
>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> wrote:
>>>>> +1 to looking at composite transforms. You could even have a composite 
>>>>> transform that takes another transform as one of its construction 
>>>>> arguments and whose expand method does pre- and post-processing to the 
>>>>> inputs/outputs before/after applying the transform in question. (You 
>>>>> could even implement this as a Python decorator if you really wanted, 
>>>>> either decorating the expand method itself or the full class...)
>>>>> 
>>>>> One of the difficulties is that for a general transform there isn't 
>>>>> necessarily a 1:N relationship between outputs and inputs as one has for 
>>>>> a DoFn (especially if there is any aggregation involved). There are, 
>>>>> however, two partial solutions that might help. 
>>>>> 
>>>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that 
>>>>> returns at most N elements. You could do this with a CombinePerKey if you 
>>>>> can come up with a reasonable key (e.g. the id of your input elements) 
>>>>> that the limit should be a applied to. Note that this may cause a lot of 
>>>>> data to be shuffled (though due to combiner lifting, no more than N per 
>>>>> bundle). 
>>>>> 
>>>>> (2) You could have a DoFn that limits to N per bundle by initializing a 
>>>>> counter in its start_bundle and passing elements through until the 
>>>>> counter reaches a threshold. (Again, one could do this per id if one is 
>>>>> available.) It wouldn't stop production of the elements, but if things 
>>>>> get fused it would still likely be fairly cheap. 
>>>>> 
>>>>> Both of these could be prepended to the problematic consuming PTransform 
>>>>> as well. 
>>>>> 
>>>>> - Robert
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran <joey.t...@schrodinger.com 
>>>>> <mailto:joey.t...@schrodinger.com>> wrote:
>>>>>> I'm aware of composite transforms and of the distributed nature of 
>>>>>> PTransforms. I'm not suggesting limiting the entire set and my example 
>>>>>> was more illustrative than the actual use case.
>>>>>> 
>>>>>> My actual use case is basically: I have multiple PTransforms, and let's 
>>>>>> say most of them average ~100 generated outputs for a single input. Most 
>>>>>> of these PTransforms will occasionally run into an input though that 
>>>>>> might output maybe 1M outputs. This can cause issues if for example 
>>>>>> there are transforms that follow it that require a lot of compute per 
>>>>>> input. 
>>>>>> 
>>>>>> The simplest way to deal with this is to modify the `DoFn`s in our 
>>>>>> Ptransforms and add a limiter in the logic (e.g. `if 
>>>>>> num_outputs_generated >= OUTPUTS_PER_INPUT_LIMIT: return`). We could 
>>>>>> duplicate this logic across our transforms, but it'd be much cleaner if 
>>>>>> we could lift up this limiting logic out of the application logic and 
>>>>>> have some generic wrapper that extends our transforms.
>>>>>> 
>>>>>> Thanks for the discussion!
>>>>>> 
>>>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko 
>>>>>> <aromanenko....@gmail.com <mailto:aromanenko....@gmail.com>> wrote:
>>>>>>> I don’t think it’s possible to extend in a way that you are asking 
>>>>>>> (like, Java classes “extend"). Though, you can create your own 
>>>>>>> composite PTransform that will incorporate one or several others inside 
>>>>>>> “expand()” method. Actually, most of the Beam native PTransforms are 
>>>>>>> composite transforms. Please, take a look on doc and examples [1]
>>>>>>> 
>>>>>>> Regarding your example, please, be aware that all PTransforms are 
>>>>>>> supposed to be executed in distributed environment and the order of 
>>>>>>> records is not guaranteed. So, limiting the whole output by fixed 
>>>>>>> number of records can be challenging - you’d need to make sure that it 
>>>>>>> will be processed on only one worker, that means that you’d need to 
>>>>>>> shuffle all your records by the same key and probably sort the records 
>>>>>>> in way that you need.
>>>>>>> 
>>>>>>> Did you consider to use “org.apache.beam.sdk.transforms.Top” for that? 
>>>>>>> [2]
>>>>>>> 
>>>>>>> If it doesn’t work for you, could you provide more details of your use 
>>>>>>> case? Then we probably can propose the more suitable solutions for that.
>>>>>>> 
>>>>>>> [1] 
>>>>>>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>>>>>>> [2] 
>>>>>>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
>>>>>>> 
>>>>>>> —
>>>>>>> Alexey
>>>>>>> 
>>>>>>>> On 15 Sep 2023, at 14:22, Joey Tran <joey.t...@schrodinger.com 
>>>>>>>> <mailto:joey.t...@schrodinger.com>> wrote:
>>>>>>>> 
>>>>>>>> Is there a way to extend already defined PTransforms? My question is 
>>>>>>>> probably better illustrated with an example. Let's say I have a 
>>>>>>>> PTransform that generates a very variable number of outputs. I'd like 
>>>>>>>> to "wrap" that PTransform such that if it ever creates more than say 
>>>>>>>> 1,000 outputs, then I just take the first 1,000 outputs without 
>>>>>>>> generating the rest of the outputs.
>>>>>>>> 
>>>>>>>> It'd be trivial if I have access to the DoFn, but what if the 
>>>>>>>> PTransform in question doesn't expose the `DoFn`?
>>>>>>> 

Reply via email to