In the past it was BoundedReadFromUnboundedSource that is still, iinm, used in KafkaIO to limit read by number of records or time. Though, in the same time we had a discussion that it should not be used anymore and considered as obsolete transform.
> On 18 Sep 2023, at 09:28, Jan Lukavský <[email protected]> wrote: > > Do we have a defined way for a PTransform to create bounded PCollection from > an unbounded one (a typical example would be LIMIT acting on unbounded > input)? AFAIK, we can use SDF to manipulate watermark, but that requires > terminating the Pipeline even though there are still upstream running > transforms (e.g. sources). I'm not sure if we have a sound definition of when > a runner should terminate a Pipeline, so I guess this is runner dependent, > right? If I'm not wrong, for example Flink does not terminate Pipeline until > there is at least one running operator, so this might require signalling > sources from sink (thus introducing some form of cycle). > > Jan > > On 9/15/23 18:55, Robert Bradshaw via user wrote: >> On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user <[email protected] >> <mailto:[email protected]>> wrote: >>> Creating composite DoFns is tricky today due to how they are implemented >>> (via annotated methods). >> >> Note that this depends on the language. This should be really easy to do >> from Python. >> >>> However providing such a method to compose DoFns would be very useful IMO. >> >> +1 >> >>> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran <[email protected] >>> <mailto:[email protected]>> wrote: >>>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2) >>>> sounds like the likely solution, was just hoping there'd be one that could >>>> wrap at the PTransform level but I realize now the PTransform abstraction >>>> is too general as you mentioned to do something like that. >>>> >>>> (2) will be likely what we do, though now I'm wondering if it might be >>>> possible to create a ParDo wrapper that can take a ParDo, extract it's >>>> dofn, wrap it, and return a new ParDo >>>> >>>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user >>>> <[email protected] <mailto:[email protected]>> wrote: >>>>> +1 to looking at composite transforms. You could even have a composite >>>>> transform that takes another transform as one of its construction >>>>> arguments and whose expand method does pre- and post-processing to the >>>>> inputs/outputs before/after applying the transform in question. (You >>>>> could even implement this as a Python decorator if you really wanted, >>>>> either decorating the expand method itself or the full class...) >>>>> >>>>> One of the difficulties is that for a general transform there isn't >>>>> necessarily a 1:N relationship between outputs and inputs as one has for >>>>> a DoFn (especially if there is any aggregation involved). There are, >>>>> however, two partial solutions that might help. >>>>> >>>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that >>>>> returns at most N elements. You could do this with a CombinePerKey if you >>>>> can come up with a reasonable key (e.g. the id of your input elements) >>>>> that the limit should be a applied to. Note that this may cause a lot of >>>>> data to be shuffled (though due to combiner lifting, no more than N per >>>>> bundle). >>>>> >>>>> (2) You could have a DoFn that limits to N per bundle by initializing a >>>>> counter in its start_bundle and passing elements through until the >>>>> counter reaches a threshold. (Again, one could do this per id if one is >>>>> available.) It wouldn't stop production of the elements, but if things >>>>> get fused it would still likely be fairly cheap. >>>>> >>>>> Both of these could be prepended to the problematic consuming PTransform >>>>> as well. >>>>> >>>>> - Robert >>>>> >>>>> >>>>> >>>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>>> I'm aware of composite transforms and of the distributed nature of >>>>>> PTransforms. I'm not suggesting limiting the entire set and my example >>>>>> was more illustrative than the actual use case. >>>>>> >>>>>> My actual use case is basically: I have multiple PTransforms, and let's >>>>>> say most of them average ~100 generated outputs for a single input. Most >>>>>> of these PTransforms will occasionally run into an input though that >>>>>> might output maybe 1M outputs. This can cause issues if for example >>>>>> there are transforms that follow it that require a lot of compute per >>>>>> input. >>>>>> >>>>>> The simplest way to deal with this is to modify the `DoFn`s in our >>>>>> Ptransforms and add a limiter in the logic (e.g. `if >>>>>> num_outputs_generated >= OUTPUTS_PER_INPUT_LIMIT: return`). We could >>>>>> duplicate this logic across our transforms, but it'd be much cleaner if >>>>>> we could lift up this limiting logic out of the application logic and >>>>>> have some generic wrapper that extends our transforms. >>>>>> >>>>>> Thanks for the discussion! >>>>>> >>>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko >>>>>> <[email protected] <mailto:[email protected]>> wrote: >>>>>>> I don’t think it’s possible to extend in a way that you are asking >>>>>>> (like, Java classes “extend"). Though, you can create your own >>>>>>> composite PTransform that will incorporate one or several others inside >>>>>>> “expand()” method. Actually, most of the Beam native PTransforms are >>>>>>> composite transforms. Please, take a look on doc and examples [1] >>>>>>> >>>>>>> Regarding your example, please, be aware that all PTransforms are >>>>>>> supposed to be executed in distributed environment and the order of >>>>>>> records is not guaranteed. So, limiting the whole output by fixed >>>>>>> number of records can be challenging - you’d need to make sure that it >>>>>>> will be processed on only one worker, that means that you’d need to >>>>>>> shuffle all your records by the same key and probably sort the records >>>>>>> in way that you need. >>>>>>> >>>>>>> Did you consider to use “org.apache.beam.sdk.transforms.Top” for that? >>>>>>> [2] >>>>>>> >>>>>>> If it doesn’t work for you, could you provide more details of your use >>>>>>> case? Then we probably can propose the more suitable solutions for that. >>>>>>> >>>>>>> [1] >>>>>>> https://beam.apache.org/documentation/programming-guide/#composite-transforms >>>>>>> [2] >>>>>>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html >>>>>>> >>>>>>> — >>>>>>> Alexey >>>>>>> >>>>>>>> On 15 Sep 2023, at 14:22, Joey Tran <[email protected] >>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>> >>>>>>>> Is there a way to extend already defined PTransforms? My question is >>>>>>>> probably better illustrated with an example. Let's say I have a >>>>>>>> PTransform that generates a very variable number of outputs. I'd like >>>>>>>> to "wrap" that PTransform such that if it ever creates more than say >>>>>>>> 1,000 outputs, then I just take the first 1,000 outputs without >>>>>>>> generating the rest of the outputs. >>>>>>>> >>>>>>>> It'd be trivial if I have access to the DoFn, but what if the >>>>>>>> PTransform in question doesn't expose the `DoFn`? >>>>>>>
