Depending on the specifics of your processing, it may be simpler to just do
both transforms within a single pardo.
i.e.
pipeline.apply(kafka.read())
.apply(ParDo.of(new UserTransform());
public static class UserTransform extends DoFn<KafkaRecord, Object>{
@ProcessElement
public void processElement(@Element KafkaRecord record,
OutputReciever<Object> receiver) {
Type1 part1 = something(record);
Type2 part2 = somethingElse(record;
MergedType merged = merge(part1, part2);
receiver.output(merged)
}
}
On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas <[email protected]>
wrote:
>
> Hello?
>
> Any advice on how to do what I described? I can only found examples of
> bounded data. Not for streaming.
>
>
>
> Aldo can I get invited to slack?
>
> Thank you very much!
>
> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
> [email protected]> escribió:
>
>> Hello,
>>
>> I'm starting using Beam and I would like to know if there is any
>> recommended pattern for doing the following:
>>
>> I have a message coming from Kafka and then I would like to apply two
>> different transformations and merge them in a single result at the end. I
>> attached an image that describes the pipeline.
>>
>> Each message has its own unique key,
>>
>> What I'm doing is using a Session Window with a trigger
>> elementCountAtLeast with the number equal to the number of process I
>> expected to generate results (in the case of the diagram will be 2)
>>
>> This is the code fragment I used for construct the window:
>>
>> Window<KV<String, OUTPUT>> joinWindow = Window.<KV<String,
>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>>
>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
>> ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>>
>>
>> and then a CoGroupKey to join all of the results. Is this a
>> recommended approach? Or is there a recommended way? What happens if at
>> some points I have a lot of windows "open"?
>>
>>
>> Thank you very much!
>>
>>