Registration open for Community Over Code North America
Hello! Registration is still open for the upcoming Community Over Code NA event in Halifax, NS! We invite you to register for the event https://communityovercode.org/registration/ Apache Committers, note that you have a special discounted rate for the conference at US$250. To take advantage of this rate, use the special code sent to the committers@ list by Brian Proffitt earlier this month. If you are in need of an invitation letter, please consult the information at https://communityovercode.org/visa-letter/ Please see https://communityovercode.org/ for more information about the event, including how to make reservations for discounted hotel rooms in Halifax. Discounted rates will only be available until Sept. 5, so reserve soon! --Rich, for the event planning team
Re: How can we get multiple side inputs from a single pipeline ?
This looks fine. One caveat: there currently appears to be a bug in Beam when you apply a combiner followed by View.asSingleton. I would recommend replacing these lines: .apply(Latest.globally()) .apply(View.asSingleton()) With the following: .apply(Reify.timestamps()) .apply(Combine.globally(Latest.combineFn()).asSingletonView()) On Mon, Aug 28, 2023 at 8:30 AM Sachin Mittal wrote: > Hi, > > I was checking the code for side input patterns : > > https://beam.apache.org/documentation/patterns/side-inputs/ > Basically I need multiple side inputs from a Slowly updating global > window side inputs. > > So as per example pipeline is something like this: > > PCollectionView map = > p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) > .apply(ParDo.of(new DoFn Map>() { @ProcessElement > public void process(@Element Long input, @Timestamp Instant timestamp, > OutputReceiver> o) {o.output(/* > output a map */);// also output another map and a > list, is this possible ? }})) > .apply(Window.>into(new > GlobalWindows()) > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) >.discardingFiredPanes()) > .apply(Latest.globally()).apply(View.asSingleton()); > > > So as an extension of this example from the same DoFn which fetches the > side input, alongside the map, I may also need another Map and another List. > Reason I need to perform this in the same DoFn is that from this function > we query external sources to get the side input and the other side inputs > are also built from the same source. > > So I would like to avoid querying external sources multiple times to > generate multiple side inputs from different DoFn and want to use the same > function to generate multiple side inputs. > > Can I achieve this by using "Tags for multiple outputs" ? > > Thanks > Sachin > > > > > >
Re: How can we get multiple side inputs from a single pipeline ?
Hi Sachin, Yes, this seems fine to me -- your DoFn can output to specific tags, and then use the PCollectionTuple.get(tagX), PCollectionTuple.get(tagY) followed by View.asSingleton, View.asList, etc, to create different PCollectionView instances. Just be careful, you might need different triggers for each View -- the one that you linked will work well for singleton, but may not be appropriate to produce lists. Let us know how it goes! Best, Bruno On Mon, Aug 28, 2023 at 11:30 AM Sachin Mittal wrote: > Hi, > > I was checking the code for side input patterns : > > https://beam.apache.org/documentation/patterns/side-inputs/ > Basically I need multiple side inputs from a Slowly updating global > window side inputs. > > So as per example pipeline is something like this: > > PCollectionView map = > p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))) > .apply(ParDo.of(new DoFn Map>() { @ProcessElement > public void process(@Element Long input, @Timestamp Instant timestamp, > OutputReceiver> o) {o.output(/* > output a map */);// also output another map and a > list, is this possible ? }})) > .apply(Window.>into(new > GlobalWindows()) > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) >.discardingFiredPanes()) > .apply(Latest.globally()).apply(View.asSingleton()); > > > So as an extension of this example from the same DoFn which fetches the > side input, alongside the map, I may also need another Map and another List. > Reason I need to perform this in the same DoFn is that from this function > we query external sources to get the side input and the other side inputs > are also built from the same source. > > So I would like to avoid querying external sources multiple times to > generate multiple side inputs from different DoFn and want to use the same > function to generate multiple side inputs. > > Can I achieve this by using "Tags for multiple outputs" ? > > Thanks > Sachin > > > > > >
How can we get multiple side inputs from a single pipeline ?
Hi, I was checking the code for side input patterns : https://beam.apache.org/documentation/patterns/side-inputs/ Basically I need multiple side inputs from a Slowly updating global window side inputs. So as per example pipeline is something like this: PCollectionView map = p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L))).apply( ParDo.of(new DoFn>() { @ProcessElement public void process(@Element Long input, @Timestamp Instant timestamp, OutputReceiver> o) { o.output(/* output a map */);// also output another map and a list, is this possible ? } })).apply( Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) .discardingFiredPanes()) .apply(Latest.globally()).apply(View.asSingleton()); So as an extension of this example from the same DoFn which fetches the side input, alongside the map, I may also need another Map and another List. Reason I need to perform this in the same DoFn is that from this function we query external sources to get the side input and the other side inputs are also built from the same source. So I would like to avoid querying external sources multiple times to generate multiple side inputs from different DoFn and want to use the same function to generate multiple side inputs. Can I achieve this by using "Tags for multiple outputs" ? Thanks Sachin