Hello Fabian, We decided that it does not make sense to create partitioned kakka partitions b'coz of hot spot considerations. So we created a way to keep trimmed state in the Accumulator provided we know the current watermark to keep the trimmed state time correct. In essence the paths we look for in a sequence of events in a session are eagerly materialized and emitted using a periodic CountTrigger followed by truncation of the state.
It requires us to know current watermark in the e Accumulator ? We do have the watermark in Trigger's onElement(), onEventTime() and onProcessingTime() through the TriggerContext , but I see no way to pass it on to the Accumulator. A lazy setting of WM on the element, which we thought was a shared instance between invocation of add() on Accumulator and onElement() on the attached Trigger, does not seem to work in a distributed environment. I tried the ProcessWindowFunction too. It was promising as it's process method has the Context and thus the WM, but it too suffers from the same issue when using WindowState ( state keyed to window and key ) in session window throwing java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows. at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:720) Vishal On Mon, Dec 11, 2017 at 10:13 AM, Vishal Santoshi <vishal.santo...@gmail.com > wrote: > Perfect, f in our use case, the kafka partition key and the keyBy use the > same exact field and thus the order will be preserved. > > On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> the order or records that are sent from one task to another task is >> preserved (task refers to the parallel instance of an operator). >> However, a task that receives records from multiple input tasks, consumes >> records from its inputs in arbitrary order. >> >> If a job reads from a partitioned Kafka topic and does a keyBy on the >> partitioning key of the Kafka topic, an operator task that follows the >> keyBy consumes all records with the same key from exactly one input task >> (the one reading the Kafka partition for the key). >> However, since Flink's and Kafka's partitioning functions are not the >> same, records from the same partition with different keys can be sent to >> different tasks. >> >> So: >> 1) Records from the same partition might not be processed by the same >> operator (and hence not in order). >> 2) Records with the same key are processed by the same operator in the >> same order in which they were read from the partition. >> >> Best, >> Fabian >> >> 2017-12-09 18:09 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> An additional question is that if the source is key partitioned ( kafka >>> ) does a keyBy retain the order of a kafka partirion across a shuffle ? >>> >>> On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> I understand that. Let me elaborate. The sequence of events is >>>> >>>> 1. Round robin dispatch to kafka cluster ( it is not partitioned on >>>> the key which we may ultimately do and than I will have more questions on >>>> how to key y and still keep order, pbly avoid shuffle :) ) . >>>> 2. key by a high cardinality key >>>> 3. Sessionize >>>> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a >>>> subsequent key by ), the sort order is not retained and the ACC has to hold >>>> on to the elements in a List . When the Window is finalized we sort the in >>>> ACC List and do pagination, We are looking for paths within a session >>>> from . a source to a sink event based. I was hoping to use ROCKS DB state >>>> as a final merged list and thus off heap and use a Count based Trigger to >>>> evaluate the ACC and merge the inter Trigger collection to the master copy >>>> rather than keeping all events in the ACC ( I would imagine a very general >>>> pattern to use ). >>>> >>>> Does that make sense ? >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> If you use an AggregatingFunction in this way (i.e. for a window) the >>>>> ACC should in fact be kept in the state backend. Did you configure the job >>>>> to use RocksDB? How are the memory problems manifesting? >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> >>>>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> >>>>> Hi Vishal, >>>>> >>>>> you are right, it is not possible to use state in an AggregateFunction >>>>> because windows need to be mergeable. >>>>> An AggregateFunction knows how to merge its accumulators but merging >>>>> generic state is not possible. >>>>> >>>>> I am not aware of an efficient and easy work around for this. >>>>> If you want to use the provided session window logic, you can use a >>>>> WindowFunction that performs all computations when the window is >>>>> triggered. >>>>> This means that aggregations do not happen eagerly and all events for a >>>>> window are collected and held in state. >>>>> Another approach could be to implement the whole logic (incl. the >>>>> session windowing) using a ProcessFunction. This would be a major effort >>>>> though. >>>>> >>>>> Best, >>>>> Fabian >>>>> >>>>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>>> >>>>>> It seems that this has to do with session windows tbat are mergeable >>>>>> ? I tried the RixhWindow function and that seems to suggest that one >>>>>> cannot >>>>>> use state ? Any ideas folks... >>>>>> >>>>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have a simple Aggregation with one caveat. For some reason I have >>>>>>> to keep a large amount of state till the window is GCed. The state is >>>>>>> within the Accumulator ( ACC ). I am hitting a memory bottleneck and >>>>>>> would >>>>>>> like to offload the state to the states backend ( ROCKSDB), keeping the >>>>>>> between checkpoint state in memory ( seems to be an obvious fix). I am >>>>>>> not >>>>>>> though allowed to have a RichAggregateFunction in the aggregate method >>>>>>> of a >>>>>>> windowed stream . That begs 2 questions >>>>>>> >>>>>>> 1. Why >>>>>>> 2. Is there an alternative for stateful window aggregation where we >>>>>>> manage the state. ? >>>>>>> >>>>>>> Thanks Vishal >>>>>>> >>>>>>> >>>>>>> Here is the code ( generics but it works ) >>>>>>> >>>>>>> SingleOutputStreamOperator<OUT> retVal = input >>>>>>> .keyBy(keySelector) >>>>>>> .window(EventTimeSessionWindows.withGap(gap)) >>>>>>> .aggregate( >>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>> >>>>>>> @Override >>>>>>> public ACC createAccumulator() { >>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>> newInstance.resetLocal(); >>>>>>> return newInstance; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void add(IN value, ACC accumulator) { >>>>>>> accumulator.add(value); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public OUT getResult(ACC accumulator) { >>>>>>> return accumulator.getLocalValue(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public ACC merge(ACC a, ACC b) { >>>>>>> a.merge(b); >>>>>>> return a; >>>>>>> } >>>>>>> }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { >>>>>>> @Override >>>>>>> public void apply(KEY s, TimeWindow window, >>>>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception { >>>>>>> out.collect(input.iterator().next()); >>>>>>> } >>>>>>> }, accType, aggregationResultType, >>>>>>> aggregationResultType); >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >> >