Hi Lukasz,

My solution yields a java.lang.IllegalArgumentException: Attempted to get
side input window for GlobalWindow from non-global WindowFn
I am now setting a fixedWindow on the ticks (as was shown in the stack
overflow example) and a couple of lines later on the Map. I've tried
removing one or both of them, but I keep getting the same issue. The other
part of my pipeline is operating on a global window at that point. So it
seems there is a mismatch but I'm not sure how to resolve it.



Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik <lc...@google.com>:

> Jose, what Bart is recommending is a path that should work.
>
> Bart, what do you mean by conflicting windows?
>
>
> On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <bart.aelter...@gmail.com>
> wrote:
>
>> Hi Jose,
>>
>>
>> You could generate a sequence of "ticks" and use that as input to
>> continuously update your side input. This is what is suggested in this
>> stack overflow post: https://stackoverflow.com/a/41271159/1805725.
>> However, the CountingInput no longer exists apparently (at least, I can't
>> find it).
>>
>> I've been working on this problem myself the last couple of days. I try
>> to read a file from storage, convert it into a Map and pass that as a side
>> input. Here is what I've come up with so far, however I am currently still
>> resolving issues with conflicting windows so this code *does not work*.:
>>
>> PCollection<Long> ticks = p
>>
>>   // Produce 1 "tick" per 10 seconds
>>
>>   .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
>>   // Window the ticks into 1-minute windows
>>   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
>>
>>
>> PCollectionView<Map<String, Long>> mapping = ticks
>> .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
>> returns a String
>> .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map
>> (KV<String, Long>)
>> .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.
>> standardDays(1)))
>> .triggering(Repeatedly.forever(AfterProcessingTime
>> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
>> .withAllowedLateness(Duration.ZERO)
>> .discardingFiredPanes()
>> )
>> .apply(View.<String, Long>asMap());
>>
>>
>> If you would be able to get a complete working example, would you mind
>> sharing it too please?
>>
>> Thanks,
>>
>> Bart
>>
>> Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <j...@propertyfinder.ae>:
>>
>>> Hi.
>>>
>>> Currently, I'm building a real time pipeline to process user
>>> interactions, I have to filter interactions based on a black-list. I used a
>>> side input to to store the list. The problem is that I'm required to fetch
>>> new elements for the black list every day.
>>>
>>> I don't want to restart the pipe to re-create the side input. My second
>>> option was to move black-list to a Redis and fetch table using the
>>> StartBundle annotation, but I think as batches are going to be small, I'm
>>> going to be doing thousands off calls to Redis, and it is going to get
>>> harder as table size increases.
>>>
>>> What other options do I have?
>>>
>>> Thanks.
>>>
>>
>>
>> --
>> Met vriendelijke groeten,
>>
>> Bart Aelterman
>> Freelance data scientist
>> http://www.bart-aelterman.com
>>
>>

-- 
Met vriendelijke groeten,

Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com

Reply via email to