Got it. In that case, cogbk is a good tool. I'm not sure how best to configure the window for your use case. Because you are only merging data you split within the window, I imagine small windows would do, but I'm not sure how best to tune that.
On Thu, Aug 10, 2023 at 11:40 AM Ruben Vargas <ruben.var...@metova.com> wrote: > Hello, Thank you very much for the reply > > I was thinking on branching because I have some heavy processes that I > would like to distribute to other workers, and scale independently of the > other less heavier processes > > Does that make sense? > > On Wed, Aug 9, 2023 at 12:16 PM John Casey via user <user@beam.apache.org> > wrote: > >> Depending on the specifics of your processing, it may be simpler to just >> do both transforms within a single pardo. >> >> i.e. >> >> pipeline.apply(kafka.read()) >> .apply(ParDo.of(new UserTransform()); >> >> public static class UserTransform extends DoFn<KafkaRecord, Object>{ >> >> @ProcessElement >> public void processElement(@Element KafkaRecord record, >> OutputReciever<Object> receiver) { >> Type1 part1 = something(record); >> Type2 part2 = somethingElse(record; >> MergedType merged = merge(part1, part2); >> receiver.output(merged) >> } >> >> } >> >> >> >> On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas <ruben.var...@metova.com> >> wrote: >> >>> >>> Hello? >>> >>> Any advice on how to do what I described? I can only found examples of >>> bounded data. Not for streaming. >>> >>> >>> >>> Aldo can I get invited to slack? >>> >>> Thank you very much! >>> >>> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas < >>> ruben.var...@metova.com> escribió: >>> >>>> Hello, >>>> >>>> I'm starting using Beam and I would like to know if there is any >>>> recommended pattern for doing the following: >>>> >>>> I have a message coming from Kafka and then I would like to apply two >>>> different transformations and merge them in a single result at the end. I >>>> attached an image that describes the pipeline. >>>> >>>> Each message has its own unique key, >>>> >>>> What I'm doing is using a Session Window with a trigger >>>> elementCountAtLeast with the number equal to the number of process I >>>> expected to generate results (in the case of the diagram will be 2) >>>> >>>> This is the code fragment I used for construct the window: >>>> >>>> Window<KV<String, OUTPUT>> joinWindow = Window.<KV<String, >>>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering( >>>> >>>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait)) >>>> ).discardingFiredPanes().withAllowedLateness(Duration.ZERO); >>>> >>>> >>>> and then a CoGroupKey to join all of the results. Is this a >>>> recommended approach? Or is there a recommended way? What happens if at >>>> some points I have a lot of windows "open"? >>>> >>>> >>>> Thank you very much! >>>> >>>>