Hi Lukasz, My solution yields a java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn I am now setting a fixedWindow on the ticks (as was shown in the stack overflow example) and a couple of lines later on the Map. I've tried removing one or both of them, but I keep getting the same issue. The other part of my pipeline is operating on a global window at that point. So it seems there is a mismatch but I'm not sure how to resolve it.
Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik <lc...@google.com>: > Jose, what Bart is recommending is a path that should work. > > Bart, what do you mean by conflicting windows? > > > On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <bart.aelter...@gmail.com> > wrote: > >> Hi Jose, >> >> >> You could generate a sequence of "ticks" and use that as input to >> continuously update your side input. This is what is suggested in this >> stack overflow post: https://stackoverflow.com/a/41271159/1805725. >> However, the CountingInput no longer exists apparently (at least, I can't >> find it). >> >> I've been working on this problem myself the last couple of days. I try >> to read a file from storage, convert it into a Map and pass that as a side >> input. Here is what I've come up with so far, however I am currently still >> resolving issues with conflicting windows so this code *does not work*.: >> >> PCollection<Long> ticks = p >> >> // Produce 1 "tick" per 10 seconds >> >> .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10))) >> // Window the ticks into 1-minute windows >> .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); >> >> >> PCollectionView<Map<String, Long>> mapping = ticks >> .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and >> returns a String >> .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map >> (KV<String, Long>) >> .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration. >> standardDays(1))) >> .triggering(Repeatedly.forever(AfterProcessingTime >> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))) >> .withAllowedLateness(Duration.ZERO) >> .discardingFiredPanes() >> ) >> .apply(View.<String, Long>asMap()); >> >> >> If you would be able to get a complete working example, would you mind >> sharing it too please? >> >> Thanks, >> >> Bart >> >> Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <j...@propertyfinder.ae>: >> >>> Hi. >>> >>> Currently, I'm building a real time pipeline to process user >>> interactions, I have to filter interactions based on a black-list. I used a >>> side input to to store the list. The problem is that I'm required to fetch >>> new elements for the black list every day. >>> >>> I don't want to restart the pipe to re-create the side input. My second >>> option was to move black-list to a Redis and fetch table using the >>> StartBundle annotation, but I think as batches are going to be small, I'm >>> going to be doing thousands off calls to Redis, and it is going to get >>> harder as table size increases. >>> >>> What other options do I have? >>> >>> Thanks. >>> >> >> >> -- >> Met vriendelijke groeten, >> >> Bart Aelterman >> Freelance data scientist >> http://www.bart-aelterman.com >> >> -- Met vriendelijke groeten, Bart Aelterman Freelance data scientist http://www.bart-aelterman.com