Hi,

Actually it seems "Fold cannot be used with a merging WindowAssigner" and
workaround I found was to use generic window function. It seems that I
would need to use the window apply anyway. Functionality is then all there,
but I am really concerned on the resource utilisations. We have quite many
concurrent users, they generate a lot of events and sessions may be long.

The workaround you gave for initialisation was exactly what I was doing
already and yes it is so dynamic that you can not use constructor. However,
I would need to also close the resources I open gracefully and as
initialisation is quite heavy it was weird to put that in fold function to
be done on first event processed.

Br,
Henri H

On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <ja...@data-artisans.com>
wrote:

> Hi Henri,
>
> #1 - This is by design.  Event time advances with the slowest input
> source.  If there are input sources that generate no data this is
> indistinguishable from a slow source.  Kafka topics where some partitions
> receive no data are a problem in this regard -- but there isn't a simple
> solution.  If possible I would address it at the source.
>
> #2 - If it's possible to run these init functions just once when you
> submit the job you can run them in the constructor of your FoldFunction.
> This init will then happen exactly once (on the client) and the constructed
> FoldFunction is then serialized and distributed around the cluster.  If
> this doesn't work because you need something truly dynamic you could also
> accomplish this with a simple local variable in your function.
>
> class MyFoldFunction extends FoldFunction {
>>   private var initialized = false
>>   def fold(accumulator: T, value: O): T = {
>>     if(!initialized){
>>       doInitStuff()
>>       initialized = true
>>     }
>>
>>     doNormalStuff()
>>   }
>> }
>
>
> #3 - One way to do this is as you've said which is to attach the profile
> information to the event, using a mapper, before it enters the window
> operations.
>
>
> On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <henri.heiska...@gmail.com
> > wrote:
>
>> Hi,
>>
>> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and
>> what I would like to accomplish is to have a stream that reads data from
>> multiple kafka topics, identifies user sessions, uses an external user user
>> profile to enrich the data, evaluates an script to produce session
>> aggregates and then create updated profiles from session aggregates. I am
>> working with high volume data and user sessions may be long, so using
>> generic window apply might not work. Below is the simplification of the
>> stream.
>>
>> stream = createKafkaStreams(...);
>> env.setParallelism(4);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> stream
>>                 .keyBy(2)
>>                 .window(EventTimeSessionWindow
>> s.withGap(Time.minutes(10)))
>>                 .fold(new SessionData(), new SessionFold(), new
>> ProfilerApply())
>>                 .print();
>>
>> The questions:
>>
>> 1. Initially when I used event time windowing I could not get any of my
>> windows to close. The reason seemed to be that I had 6 partitions in my
>> test kafka setup and only 4 of them generated traffic. If I used
>> parallelism above 4, then no windows were closed. Is this by design or a
>> defect? We use flink-connector-kafka-0.10 because earlier versions did not
>> commit the offsets correctly.
>>
>> 2. Rich fold functions are not supported. However I would like execute a
>> piece of custom script in the fold function that requires initialisation
>> part. I would have used the open and close lifecycle methods of rich
>> functions but they are not available now in fold. What would be the
>> preferred way to run some initialisation routines (and closing the
>> gracefully) when using fold?
>>
>> 3. Kind of related to above. I would also like to fetch a user profile
>> from external source in the beginning of the session. What would be a best
>> practice for that kind of operation? If I would be using the generic window
>> apply I could fetch in in the beginning of the apply method. I was thinking
>> of introducing a mapper that fetches this profiler periodically and caches
>> it to flink state. However, with this setup I would not be able to tie this
>> to user sessions identified for windows.
>>
>> 4. I also may have an additional requirement of writing out each event
>> enriched with current session and profile data. I basically could do this
>> again with generic window function and write out each event with collector
>> when iterating, but would there be a better pattern to use? Maybe sharing
>> state with functions or something.
>>
>> Br,
>> Henri H
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> ja...@data-artisans.com
>
>

Reply via email to