Hi,

Unfortunately I can not use reduce function.

I am now going with WindowFunction and see how it works on our production
load.

Br,
Henkka

On Wed, Jan 4, 2017 at 2:46 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Henri,
>
> can you express the logic of your FoldFunction (or WindowFunction) as a
> combination of ReduceFunction and WindowFunction [1]?
> ReduceFunction should be supported by a merging WindowAssigner and has the
> same resource consumption as a FoldFunction, i.e., a single record per
> window.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation
>
> 2017-01-03 12:32 GMT+01:00 Henri Heiskanen <henri.heiska...@gmail.com>:
>
>> Hi,
>>
>> Actually it seems "Fold cannot be used with a merging WindowAssigner" and
>> workaround I found was to use generic window function. It seems that I
>> would need to use the window apply anyway. Functionality is then all there,
>> but I am really concerned on the resource utilisations. We have quite many
>> concurrent users, they generate a lot of events and sessions may be long.
>>
>> The workaround you gave for initialisation was exactly what I was doing
>> already and yes it is so dynamic that you can not use constructor. However,
>> I would need to also close the resources I open gracefully and as
>> initialisation is quite heavy it was weird to put that in fold function to
>> be done on first event processed.
>>
>> Br,
>> Henri H
>>
>> On Mon, Jan 2, 2017 at 10:20 PM, Jamie Grier <ja...@data-artisans.com>
>> wrote:
>>
>>> Hi Henri,
>>>
>>> #1 - This is by design.  Event time advances with the slowest input
>>> source.  If there are input sources that generate no data this is
>>> indistinguishable from a slow source.  Kafka topics where some partitions
>>> receive no data are a problem in this regard -- but there isn't a simple
>>> solution.  If possible I would address it at the source.
>>>
>>> #2 - If it's possible to run these init functions just once when you
>>> submit the job you can run them in the constructor of your FoldFunction.
>>> This init will then happen exactly once (on the client) and the constructed
>>> FoldFunction is then serialized and distributed around the cluster.  If
>>> this doesn't work because you need something truly dynamic you could also
>>> accomplish this with a simple local variable in your function.
>>>
>>> class MyFoldFunction extends FoldFunction {
>>>>   private var initialized = false
>>>>   def fold(accumulator: T, value: O): T = {
>>>>     if(!initialized){
>>>>       doInitStuff()
>>>>       initialized = true
>>>>     }
>>>>
>>>>     doNormalStuff()
>>>>   }
>>>> }
>>>
>>>
>>> #3 - One way to do this is as you've said which is to attach the profile
>>> information to the event, using a mapper, before it enters the window
>>> operations.
>>>
>>>
>>> On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <
>>> henri.heiska...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT
>>>> and what I would like to accomplish is to have a stream that reads data
>>>> from multiple kafka topics, identifies user sessions, uses an external user
>>>> user profile to enrich the data, evaluates an script to produce session
>>>> aggregates and then create updated profiles from session aggregates. I am
>>>> working with high volume data and user sessions may be long, so using
>>>> generic window apply might not work. Below is the simplification of the
>>>> stream.
>>>>
>>>> stream = createKafkaStreams(...);
>>>> env.setParallelism(4);
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> stream
>>>>                 .keyBy(2)
>>>>                 .window(EventTimeSessionWindow
>>>> s.withGap(Time.minutes(10)))
>>>>                 .fold(new SessionData(), new SessionFold(), new
>>>> ProfilerApply())
>>>>                 .print();
>>>>
>>>> The questions:
>>>>
>>>> 1. Initially when I used event time windowing I could not get any of my
>>>> windows to close. The reason seemed to be that I had 6 partitions in my
>>>> test kafka setup and only 4 of them generated traffic. If I used
>>>> parallelism above 4, then no windows were closed. Is this by design or a
>>>> defect? We use flink-connector-kafka-0.10 because earlier versions did not
>>>> commit the offsets correctly.
>>>>
>>>> 2. Rich fold functions are not supported. However I would like execute
>>>> a piece of custom script in the fold function that requires initialisation
>>>> part. I would have used the open and close lifecycle methods of rich
>>>> functions but they are not available now in fold. What would be the
>>>> preferred way to run some initialisation routines (and closing the
>>>> gracefully) when using fold?
>>>>
>>>> 3. Kind of related to above. I would also like to fetch a user profile
>>>> from external source in the beginning of the session. What would be a best
>>>> practice for that kind of operation? If I would be using the generic window
>>>> apply I could fetch in in the beginning of the apply method. I was thinking
>>>> of introducing a mapper that fetches this profiler periodically and caches
>>>> it to flink state. However, with this setup I would not be able to tie this
>>>> to user sessions identified for windows.
>>>>
>>>> 4. I also may have an additional requirement of writing out each event
>>>> enriched with current session and profile data. I basically could do this
>>>> again with generic window function and write out each event with collector
>>>> when iterating, but would there be a better pattern to use? Maybe sharing
>>>> state with functions or something.
>>>>
>>>> Br,
>>>> Henri H
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier <https://twitter.com/jamiegrier>
>>> ja...@data-artisans.com
>>>
>>>
>>
>

Reply via email to