Thank you Luke. I changed DefaultTrigger.of() to AfterProcessingTime. pastFirstElementInPane() and it worked.
On Mon, Jun 29, 2020 at 9:09 AM Luke Cwik <lc...@google.com> wrote: > The UpdateFn won't be invoked till the side input is ready which requires > either the watermark to pass the end of the global window + allowed > lateness (to show that the side input is empty) or at least one firing to > populate it with data. See this general section on side inputs[1] and some > useful patterns[2] (there are some examples for how to get globally > windowed side inputs to work). > > 1: https://beam.apache.org/documentation/programming-guide/#side-inputs > 2: https://beam.apache.org/documentation/patterns/side-inputs/ > > On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan < > harish.prav...@gmail.com> wrote: > >> >> Hi All - I am facing an issue while using *side-input*. >> >> *What am I doing:* >> From my main program, I am calling a custom PTransform with a >> PCollectionView as parameter. Inside custom PTransform, I am passing the >> PCollectionView as a side-input to a DoFn. >> >> *Issue:* >> When I run the pipeline, I am expecting the log statement inside my >> DoFn's processElement to get executed but it is not getting logged. If I >> remove the side-input to my DoFn then the log is getting printed. I am >> suspecting whether it could be related to windowing/execution order or my >> side-input somehow being empty. Appreciate if you can clarify on what is >> going wrong here. >> >> *Code Structure:* >> >> >> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx()); >> >> // Get two tuple tags from first transformation >> PCollection1 = tuple.get(tag1).setCoder(...); >> PCollection2 = tuple.get(tag2).setCoder(...); >> >> // Converting PCollection1 to PCollectionView to use as a side-input >> // Note: I need to introduce a global window here as my source is >> unbounded and when we use View.asList() it does GroupByKey internally >> which inturn demands a window >> PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new >> GlobalWindows()) // Everything into global window. >> >> .triggering(Repeatedly.forever(DefaultTrigger.of())) >> >> .discardingFiredPanes()).apply(Values.create()).apply(View.asList()); >> >> // Pass PCollectionView to SecondTx as a param >> PCollection3 = PCollection2.apply(new SecondTx(PView)); >> >> *SecondTx:* >> Inside my SecondTx, I am getting the PView from constructor (this.PView = >> PView) and calling a DoFn >> >> public PCollection<CustomObject> expand(PCollection <KV <String, KV >> <String, CustomObject>>> input) { >> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView)); >> ... >> } >> >> // DoFn >> class UpdateFn extends DoFn<Map<String, Map<String, Map<String, >> String>>>, CustomObject> { >> @ProcessElement >> public void processElement(@Element Map<String, Map<String, >> Map<String, String>>> input, OutputReceiver<CustomObject> out) { >> * Log.of("UpdateFn " + input);* >> out.output(new CustomObject()); >> } >> } >> >> -- >> Thanks, >> Praveen K Viswanathan >> > -- Thanks, Praveen K Viswanathan