Hi, Unfortunately I can not use reduce function.
I am now going with WindowFunction and see how it works on our production load. Br, Henkka On Wed, Jan 4, 2017 at 2:46 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Henri, > > can you express the logic of your FoldFunction (or WindowFunction) as a > combination of ReduceFunction and WindowFunction [1]? > ReduceFunction should be supported by a merging WindowAssigner and has the > same resource consumption as a FoldFunction, i.e., a single record per > window. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation > > 2017-01-03 12:32 GMT+01:00 Henri Heiskanen <henri.heiska...@gmail.com>: > >> Hi, >> >> Actually it seems "Fold cannot be used with a merging WindowAssigner" and >> workaround I found was to use generic window function. It seems that I >> would need to use the window apply anyway. Functionality is then all there, >> but I am really concerned on the resource utilisations. We have quite many >> concurrent users, they generate a lot of events and sessions may be long. >> >> The workaround you gave for initialisation was exactly what I was doing >> already and yes it is so dynamic that you can not use constructor. However, >> I would need to also close the resources I open gracefully and as >> initialisation is quite heavy it was weird to put that in fold function to >> be done on first event processed. >> >> Br, >> Henri H >> >> On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <ja...@data-artisans.com> >> wrote: >> >>> Hi Henri, >>> >>> #1 - This is by design. Event time advances with the slowest input >>> source. If there are input sources that generate no data this is >>> indistinguishable from a slow source. Kafka topics where some partitions >>> receive no data are a problem in this regard -- but there isn't a simple >>> solution. If possible I would address it at the source. >>> >>> #2 - If it's possible to run these init functions just once when you >>> submit the job you can run them in the constructor of your FoldFunction. >>> This init will then happen exactly once (on the client) and the constructed >>> FoldFunction is then serialized and distributed around the cluster. If >>> this doesn't work because you need something truly dynamic you could also >>> accomplish this with a simple local variable in your function. >>> >>> class MyFoldFunction extends FoldFunction { >>>> private var initialized = false >>>> def fold(accumulator: T, value: O): T = { >>>> if(!initialized){ >>>> doInitStuff() >>>> initialized = true >>>> } >>>> >>>> doNormalStuff() >>>> } >>>> } >>> >>> >>> #3 - One way to do this is as you've said which is to attach the profile >>> information to the event, using a mapper, before it enters the window >>> operations. >>> >>> >>> On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen < >>> henri.heiska...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT >>>> and what I would like to accomplish is to have a stream that reads data >>>> from multiple kafka topics, identifies user sessions, uses an external user >>>> user profile to enrich the data, evaluates an script to produce session >>>> aggregates and then create updated profiles from session aggregates. I am >>>> working with high volume data and user sessions may be long, so using >>>> generic window apply might not work. Below is the simplification of the >>>> stream. >>>> >>>> stream = createKafkaStreams(...); >>>> env.setParallelism(4); >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> stream >>>> .keyBy(2) >>>> .window(EventTimeSessionWindow >>>> s.withGap(Time.minutes(10))) >>>> .fold(new SessionData(), new SessionFold(), new >>>> ProfilerApply()) >>>> .print(); >>>> >>>> The questions: >>>> >>>> 1. Initially when I used event time windowing I could not get any of my >>>> windows to close. The reason seemed to be that I had 6 partitions in my >>>> test kafka setup and only 4 of them generated traffic. If I used >>>> parallelism above 4, then no windows were closed. Is this by design or a >>>> defect? We use flink-connector-kafka-0.10 because earlier versions did not >>>> commit the offsets correctly. >>>> >>>> 2. Rich fold functions are not supported. However I would like execute >>>> a piece of custom script in the fold function that requires initialisation >>>> part. I would have used the open and close lifecycle methods of rich >>>> functions but they are not available now in fold. What would be the >>>> preferred way to run some initialisation routines (and closing the >>>> gracefully) when using fold? >>>> >>>> 3. Kind of related to above. I would also like to fetch a user profile >>>> from external source in the beginning of the session. What would be a best >>>> practice for that kind of operation? If I would be using the generic window >>>> apply I could fetch in in the beginning of the apply method. I was thinking >>>> of introducing a mapper that fetches this profiler periodically and caches >>>> it to flink state. However, with this setup I would not be able to tie this >>>> to user sessions identified for windows. >>>> >>>> 4. I also may have an additional requirement of writing out each event >>>> enriched with current session and profile data. I basically could do this >>>> again with generic window function and write out each event with collector >>>> when iterating, but would there be a better pattern to use? Maybe sharing >>>> state with functions or something. >>>> >>>> Br, >>>> Henri H >>>> >>> >>> >>> >>> -- >>> >>> Jamie Grier >>> data Artisans, Director of Applications Engineering >>> @jamiegrier <https://twitter.com/jamiegrier> >>> ja...@data-artisans.com >>> >>> >> >