Hi Reza,

Just read a blog post[1] by you two years ago and you mentioned 

> Because this pattern uses a global-window SideInput, matching to elements 
> being processed will be nondeterministic.

Do you mean two workers working on the same windowed main input and use 
different global windowed side input?

Also, if that is the case, do you think “Flatten, FixedWindow” will solve the 
case?

Thanks so much!
Siyu

[1]: 
https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1

> On Mar 28, 2022, at 7:48 PM, Reza Rokni <r...@google.com> wrote:
> 
> So digging a little into this, 
> 
> To simulate the condition if we have a PCollectionList which has two 
> GenerateSequence then it should result in the more than one element in the 
> View error. 
> 
> Then rather than apply the GlobalWindow right after, we do a Flatten, 
> FixedWIndow, Sum this should ensure only 1 value comes out into the DoFN 
> which has the readTestData(). We then apply the GlobalWindow before the View 
> as Singleton. Remove the Fixed Window and the Sum and the below should fail, 
> with it no error, but need to think if that does actually cover all bases. 
> 
> PCollection<Long> toMany = 
> PCollectionList.<Long>of(p.apply(GenerateSequence.from(0)
>         .withRate(1, 
> Duration.standardSeconds(5L)))).and(p.apply(GenerateSequence.from(0)
>         .withRate(1, Duration.standardSeconds(5L))))
>         .apply(Flatten.pCollections())
>         .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
>                 .apply(Sum.longsGlobally().withoutDefaults());
> 
> // Create a side input that updates each second.
> PCollectionView<Map<String, String>> map =
>         toMany
>                 .apply(
>                         ParDo.of(
>                                 new DoFn<Long,Map<String,String>>() {
> 
>                                     @ProcessElement
>                                     public void process(
>                                             @Element Long input, 
> OutputReceiver<Map<String, String>> o) {
>                                         // Replace map with test data from 
> the placeholder external service.
>                                         // Add external reads here.
>                                         
> o.output(PlaceholderExternalService.readTestData());
>                                     }
>                                 }))
>                 .apply( Window.<Map<String,String>>into(new GlobalWindows())
>                         
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>                         .discardingFiredPanes())
>                 .apply(View.asSingleton());
> 
> 
> 
> On Thu, Mar 3, 2022 at 6:25 PM John Gerassimou <john.gerassi...@unity3d.com 
> <mailto:john.gerassi...@unity3d.com>> wrote:
> Has anyone gotten a chance to investigate? Our team likes the benefits of 
> this pattern. We are hesitant to use it in production as it presents a risk.
> 
> On Tue, Feb 22, 2022 at 3:11 PM Reza Rokni <r...@google.com 
> <mailto:r...@google.com>> wrote:
> Yup this looks to be a buggy example, the bug is probably not often 
> encountered as one of the properties of this example is that its 
> non-deterministic as to when the values will be pushed to workers. So most 
> uses will be setting a larger value for duration which would then hit the bug 
> in very rare cases.
> 
> I will try to run some tests using Reuven's Latest method and update the 
> example. 
> 
> On Tue, Feb 22, 2022 at 11:29 AM John Gerassimou <john.gerassi...@unity3d.com 
> <mailto:john.gerassi...@unity3d.com>> wrote:
> I also had issues using this pattern. In most cases, it works fine, but the 
> duplicate error showed up after 4000 or so triggers using a 30-second timer. 
> I've tried to apply aggregation before View Singleton to enforce a single 
> element, but that didn't solve the issue.
> 
> Setting the timer to 5-minutes seemed to alleviate (or delay) the problem but 
> I need to do more testing. 
> 
> On Tue, Feb 22, 2022 at 2:22 PM Reuven Lax <re...@google.com 
> <mailto:re...@google.com>> wrote:
> elementCountAtLeast only guarantees a lower bound on the elements in a pane. 
> No upper bound is guaranteed.
> 
> On Tue, Feb 22, 2022 at 11:16 AM Steve Niemitz <sniem...@apache.org 
> <mailto:sniem...@apache.org>> wrote:
> Does "Repeatedly.forever(AfterPane.elementCountAtLeast(1)" solve this?  At 
> least in my tests it seems like this correctly only emits a single element 
> per pane, but I'm not sure how much of a guarantee there actually is that 
> there will never be more than N elements in a pane when 
> elementCountAtLeast(N) is set.
> 
> On Tue, Feb 22, 2022 at 2:06 PM Luke Cwik <lc...@google.com 
> <mailto:lc...@google.com>> wrote:
> I'm not certain that Latest would work since the processing time trigger 
> would still cause multiple firings to occur each producing the "latest" at 
> that point in time. All these firings would effectively be output to the 
> PCollection that the view is over. The PCollection would effectively be a 
> concatenation of all these firings.
> 
> 
> 
> On Tue, Feb 22, 2022 at 10:57 AM Pavel Solomin <p.o.solo...@gmail.com 
> <mailto:p.o.solo...@gmail.com>> wrote:
> I also did not succeed in making this pattern work some time ago. In the link 
> below there's my mail thread with code example - do you have a similar 
> use-case?
> 
> https://lists.apache.org/thread/9l74o4vqbtfgc5vkj9qq0xofffmtxswc 
> <https://lists.apache.org/thread/9l74o4vqbtfgc5vkj9qq0xofffmtxswc>
> 
> Will keep watching this thread for insights.
> 
> Best Regards,
> Pavel Solomin
> 
> Tel: +351 962 950 692 <tel:+351%20962%20950%20692> | Skype: pavel_solomin | 
> Linkedin <https://www.linkedin.com/in/pavelsolomin>
> 
> 
> 
> 
> On Tue, 22 Feb 2022 at 18:46, Steve Niemitz <sniem...@twitter.com 
> <mailto:sniem...@twitter.com>> wrote:
> We had a team try to use the "slowly updating global window side inputs" 
> pattern (on dataflow) to update some metadata in their pipeline every minute, 
> but surprisingly ran into errors that the side input PCollection contained 
> more than one element, [1] although this only manifested intermittently. 
> 
> My theory on why this breaks is as follows, can someone check my logic?
> 
> Given that GenerateSequence operates on processing time, (although this might 
> not actually matter) it's possible that if processing the source is delayed 
> for whatever reason, the source may emit multiple elements at once in a 
> single bundle.  For example, if I configure the source to generate an element 
> every 10 seconds, and the evaluation of the source is delayed for 30 seconds, 
> I'd get a bundle with 3 elements in it. (or so it seems)  All elements are 
> then windowed into the global window, so they all end up in the same window.
> 
> If a bundle with 3 elements enters the 
> AfterProcessingTime.pastFirstElementInPane() state machine, all 3 elements 
> will be emitted in that pane.  This will then propagate down and break on the 
> singleton view combiner.
> 
> Is my thought process here correct?  Is the example here just buggy?
> 
> [1] "pcollection view being accessed as a singleton despite having more than 
> one input."

Reply via email to