Hello Fabian,
                      We decided that it does not make sense to create
partitioned kakka partitions b'coz of hot spot considerations. So we
created a way to keep trimmed state in the Accumulator provided  we know
the current  watermark to keep the trimmed state time correct.  In essence
the paths we look for in a sequence of events in a session are eagerly
materialized and emitted using a periodic CountTrigger followed by
truncation of the state.

                 It requires us to know current watermark in the e
Accumulator ?  We do have the watermark in Trigger's onElement(),
onEventTime()  and onProcessingTime() through the TriggerContext , but I
see no way to pass it on to the Accumulator. A lazy setting of WM on the
element, which we thought was a shared instance between invocation of add()
on Accumulator and onElement() on the attached Trigger, does not seem to
work in a distributed environment.

I tried the ProcessWindowFunction too. It was promising as it's process
method has the Context and thus the WM, but it too suffers from the same
issue when using WindowState ( state keyed to window and key ) in session
window throwing

java.lang.UnsupportedOperationException: Per-window state is not
allowed when using merging windows.
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:720)


Vishal



On Mon, Dec 11, 2017 at 10:13 AM, Vishal Santoshi <vishal.santo...@gmail.com
> wrote:

> Perfect, f in our use case, the kafka partition  key and the keyBy use the
> same exact field and thus the order will be preserved.
>
> On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> the order or records that are sent from one task to another task is
>> preserved (task refers to the parallel instance of an operator).
>> However, a task that receives records from multiple input tasks, consumes
>> records from its inputs in arbitrary order.
>>
>> If a job reads from a partitioned Kafka topic and does a keyBy on the
>> partitioning key of the Kafka topic, an operator task that follows the
>> keyBy consumes all records with the same key from exactly one input task
>> (the one reading the Kafka partition for the key).
>> However, since Flink's and Kafka's partitioning functions are not the
>> same, records from the same partition with different keys can be sent to
>> different tasks.
>>
>> So:
>> 1) Records from the same partition might not be processed by the same
>> operator (and hence not in order).
>> 2) Records with the same key are processed by the same operator in the
>> same order in which they were read from the partition.
>>
>> Best,
>> Fabian
>>
>> 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> An additional question is that if the source is key partitioned  ( kafka
>>> ) does a keyBy retain the order of  a kafka partirion across a shuffle ?
>>>
>>> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> I understand that. Let me elaborate. The sequence of events is
>>>>
>>>> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on
>>>> the key which we may ultimately do  and than I will have more questions on
>>>> how to key y and still keep order, pbly avoid shuffle :) ) .
>>>> 2. key by a high cardinality key
>>>> 3. Sessionize
>>>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
>>>> subsequent key by ), the sort order is not retained and the ACC has to hold
>>>> on to the elements in a List . When the Window is finalized  we sort the in
>>>> ACC  List  and do pagination, We are looking for paths within a session
>>>> from . a source to a sink event based. I was hoping to use ROCKS DB state
>>>> as a final merged list and thus off heap and  use a Count based Trigger to
>>>> evaluate the ACC and merge the inter Trigger collection to the  master copy
>>>> rather than keeping all events in the ACC ( I would imagine a very general
>>>> pattern to use ).
>>>>
>>>> Does that make sense ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you use an AggregatingFunction in this way (i.e. for a window) the
>>>>> ACC should in fact be kept in the state backend. Did you configure the job
>>>>> to use RocksDB? How are the memory problems manifesting?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> you are right, it is not possible to use state in an AggregateFunction
>>>>> because windows need to be mergeable.
>>>>> An AggregateFunction knows how to merge its accumulators but merging
>>>>> generic state is not possible.
>>>>>
>>>>> I am not aware of an efficient and easy work around for this.
>>>>> If you want to use the provided session window logic, you can use a
>>>>> WindowFunction that performs all computations when the window is 
>>>>> triggered.
>>>>> This means that aggregations do not happen eagerly and all events for a
>>>>> window are collected and held in state.
>>>>> Another approach could be to implement the whole logic (incl. the
>>>>> session windowing) using a ProcessFunction. This would be a major effort
>>>>> though.
>>>>>
>>>>> Best,
>>>>> Fabian
>>>>>
>>>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>>
>>>>>> It seems that this has to do with session windows tbat are mergeable
>>>>>> ? I tried the RixhWindow function and that seems to suggest that one 
>>>>>> cannot
>>>>>> use state ? Any ideas folks...
>>>>>>
>>>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a simple Aggregation with one caveat. For some reason I have
>>>>>>> to keep a large amount of state till the window is GCed. The state is
>>>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and 
>>>>>>> would
>>>>>>> like to offload the state  to the states backend ( ROCKSDB), keeping the
>>>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am 
>>>>>>> not
>>>>>>> though allowed to have a RichAggregateFunction in the aggregate method 
>>>>>>> of a
>>>>>>> windowed stream . That begs 2 questions
>>>>>>>
>>>>>>> 1. Why
>>>>>>> 2. Is there an alternative for stateful window aggregation where we
>>>>>>> manage the state. ?
>>>>>>>
>>>>>>> Thanks Vishal
>>>>>>>
>>>>>>>
>>>>>>> Here is the code ( generics but it works  )
>>>>>>>
>>>>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>>>>         .keyBy(keySelector)
>>>>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>>>>         .aggregate(
>>>>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public ACC createAccumulator() {
>>>>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>>>>                         newInstance.resetLocal();
>>>>>>>                         return newInstance;
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public void add(IN value, ACC accumulator) {
>>>>>>>                         accumulator.add(value);
>>>>>>>
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public OUT getResult(ACC accumulator) {
>>>>>>>                         return accumulator.getLocalValue();
>>>>>>>                     }
>>>>>>>
>>>>>>>                     @Override
>>>>>>>                     public ACC merge(ACC a, ACC b) {
>>>>>>>                         a.merge(b);
>>>>>>>                         return a;
>>>>>>>                     }
>>>>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>>>>                     @Override
>>>>>>>                     public void apply(KEY s, TimeWindow window, 
>>>>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>>>>                             out.collect(input.iterator().next());
>>>>>>>                     }
>>>>>>>                 }, accType, aggregationResultType, 
>>>>>>> aggregationResultType);
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to