Hi, Actually it seems "Fold cannot be used with a merging WindowAssigner" and workaround I found was to use generic window function. It seems that I would need to use the window apply anyway. Functionality is then all there, but I am really concerned on the resource utilisations. We have quite many concurrent users, they generate a lot of events and sessions may be long.
The workaround you gave for initialisation was exactly what I was doing already and yes it is so dynamic that you can not use constructor. However, I would need to also close the resources I open gracefully and as initialisation is quite heavy it was weird to put that in fold function to be done on first event processed. Br, Henri H On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <ja...@data-artisans.com> wrote: > Hi Henri, > > #1 - This is by design. Event time advances with the slowest input > source. If there are input sources that generate no data this is > indistinguishable from a slow source. Kafka topics where some partitions > receive no data are a problem in this regard -- but there isn't a simple > solution. If possible I would address it at the source. > > #2 - If it's possible to run these init functions just once when you > submit the job you can run them in the constructor of your FoldFunction. > This init will then happen exactly once (on the client) and the constructed > FoldFunction is then serialized and distributed around the cluster. If > this doesn't work because you need something truly dynamic you could also > accomplish this with a simple local variable in your function. > > class MyFoldFunction extends FoldFunction { >> private var initialized = false >> def fold(accumulator: T, value: O): T = { >> if(!initialized){ >> doInitStuff() >> initialized = true >> } >> >> doNormalStuff() >> } >> } > > > #3 - One way to do this is as you've said which is to attach the profile > information to the event, using a mapper, before it enters the window > operations. > > > On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <henri.heiska...@gmail.com > > wrote: > >> Hi, >> >> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and >> what I would like to accomplish is to have a stream that reads data from >> multiple kafka topics, identifies user sessions, uses an external user user >> profile to enrich the data, evaluates an script to produce session >> aggregates and then create updated profiles from session aggregates. I am >> working with high volume data and user sessions may be long, so using >> generic window apply might not work. Below is the simplification of the >> stream. >> >> stream = createKafkaStreams(...); >> env.setParallelism(4); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> stream >> .keyBy(2) >> .window(EventTimeSessionWindow >> s.withGap(Time.minutes(10))) >> .fold(new SessionData(), new SessionFold(), new >> ProfilerApply()) >> .print(); >> >> The questions: >> >> 1. Initially when I used event time windowing I could not get any of my >> windows to close. The reason seemed to be that I had 6 partitions in my >> test kafka setup and only 4 of them generated traffic. If I used >> parallelism above 4, then no windows were closed. Is this by design or a >> defect? We use flink-connector-kafka-0.10 because earlier versions did not >> commit the offsets correctly. >> >> 2. Rich fold functions are not supported. However I would like execute a >> piece of custom script in the fold function that requires initialisation >> part. I would have used the open and close lifecycle methods of rich >> functions but they are not available now in fold. What would be the >> preferred way to run some initialisation routines (and closing the >> gracefully) when using fold? >> >> 3. Kind of related to above. I would also like to fetch a user profile >> from external source in the beginning of the session. What would be a best >> practice for that kind of operation? If I would be using the generic window >> apply I could fetch in in the beginning of the apply method. I was thinking >> of introducing a mapper that fetches this profiler periodically and caches >> it to flink state. However, with this setup I would not be able to tie this >> to user sessions identified for windows. >> >> 4. I also may have an additional requirement of writing out each event >> enriched with current session and profile data. I basically could do this >> again with generic window function and write out each event with collector >> when iterating, but would there be a better pattern to use? Maybe sharing >> state with functions or something. >> >> Br, >> Henri H >> > > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com > >