Hi Mihail!

Do I understand you correctly that the use case is to raise an alarm if an
order has not been processed within a certain time period (certain number
of days) ?

If that is the case, the use case is actually perfect for a special form of
session windows that monitor such timeouts. I have prototyped a sample
application for a different use case, but it should fit your use case as
well:
https://github.com/StephanEwen/flink-demos/blob/master/timeout-monitoring/src/main/java/com/mythingy/streaming/EventStreamAnalysis.java

In that example, the timeout is 5 seconds, but there is no reason why the
timeout could not be multiple days. Windows may be very long - no problem.

Unlike many other streaming systems, each key has an individual window, so
one key's session window may start at one point in time, and the other
key's session window at a very different point. One window may finish
within in a few hours (fast processed order), one window see the timout
after three days (order that was not processed in time).

Greetings,
Stephan


On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail <mihail.vi...@zalando.de>
wrote:

> Hi Gyula, Hi Stephan,
>
> thank you for your replies.
>
> We need a state which grows indefinitely for the following use case. An
> event is created when a customer places an order. Another event is created
> when the order is sent. These events typically occur within days. We need
> to catch the cases when the said events occur over a specified time period
> to raise an alarm.
>
> So having a window of a couple of days is not feasible. Thus we need the
> state.
>
> I believe having a different state backend would circumvent the OOM issue.
> We were thinking of Redis for performance reasons. MySQL might do as well,
> if it doesn't slow down the processing too much.
>
> Are there limitations for SqlStateBackend when working with state only?
> When would the window state limitation occur?
>
> Cheers,
> Mihail
>
>
> 2015-12-02 13:38 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
>> Mihail!
>>
>> The Flink windows are currently in-memory only. There are plans to relax
>> that, but for the time being, having enough memory in the cluster is
>> important.
>>
>> @Gyula: I think window state is currently also limited when using the
>> SqlStateBackend, by the size of a row in the database (because windows are
>> not key/value state currently)
>>
>>
>> Here are some simple rules-of-thumb to work with:
>>
>> 1) For windows, the number of expected keys can be without bound. It is
>> important to have a rough upper bound for the number of "active keys at a
>> certain time". For example, if you have your time windows (let's say by 10
>> minutes or so), it only matters how many keys you have within each 10
>> minute interval. Those define how much memory you need.
>>
>> 2) If you work with the "OperatorState" abstraction, then you need to
>> think about cleanup a bit. The OperatorState keeps state currently for as
>> long until you set the state for the key to "null". This manual state is
>> explicitly designed to allow you to keep state across windows and across
>> very long time. On the flip side, you need to manage the amount of state
>> you store, by releasing state for keys.
>>
>> 3) If a certain key space grows infinite, you should "scope the state by
>> time". A pragmatic solution for that is to define a session window:
>>   - The session length defines after what inactivity the state is cleaned
>> (let's say 1h session length or so)
>>   - The trigger implements this session (there are a few mails on this
>> list already that explain how to do this) and take care of evaluating on
>> every element.
>>   - A count(1) evictor makes sure only one element is ever stored
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <gyf...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> I am working on a use case that involves storing state for billions of
>>> keys. For this we use a MySql state backend that will write each key-value
>>> state to MySql server so it will only hold a limited set of key-value pairs
>>> on heap while maintaining the processing guarantees.
>>>
>>> This will keep our streaming job from running out of memory as most of
>>> the state is off heap. I am not sure if this is relevant to your use case
>>> but if the state size grows indefinitely you might want to give it a try.
>>>
>>> I will write a detailed guide in some days but if you want to get
>>> started check this one out:
>>>
>>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>>>
>>> There are some pending improvements that I will commit in the next days
>>> that will increase the performance of the MySql adapter
>>>
>>> Let me know if you are interested in this!
>>>
>>> Cheers,
>>> Gyula
>>>
>>>
>>> Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec.
>>> 2., Sze, 11:26):
>>>
>>>> Hi Aljoscha,
>>>>
>>>> we have no upper bound for the number of expected keys. The max size
>>>> for an element is 1 KB.
>>>>
>>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
>>>> operators in the job. In the first Map we parse the contained JSON object
>>>> in each element and forward it as a Flink Tuple. In the Reduce we update
>>>> the state for each key. That's about it.
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>>
>>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>
>>>>> Hi Mihail,
>>>>> could you please give some information about the number of keys that
>>>>> you are expecting in the data and how big the elements are that you are
>>>>> processing in the window.
>>>>>
>>>>> Also, are there any other operations that could be taxing on Memory. I
>>>>> think the different exception you see for 500MB mem size is just because
>>>>> Java notices that it ran out of memory at a different part in the program.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de>
>>>>> wrote:
>>>>> >
>>>>> > Yes, with the "start-cluster-streaming.sh" script.
>>>>> > If the TaskManager gets 5GB of heap it manages to process ~100
>>>>> million messages and then throws the above OOM.
>>>>> > If it gets only 500MB it manages to process ~8 million and a
>>>>> somewhat misleading exception is thrown:
>>>>> >
>>>>> > 12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1)
>>>>> switched to FAILED
>>>>> > java.lang.Exception: Java heap space
>>>>> >     at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>> >     at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>>>> >     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>>>>> >     at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
>>>>> >     at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
>>>>> >     at
>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
>>>>> >     at
>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
>>>>> >     at
>>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
>>>>> >     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
>>>>> >     at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>>>>> > Its good news that the issue has been resolved.
>>>>> >
>>>>> > Regarding the OOM, did you start Flink in the streaming mode?
>>>>> >
>>>>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <
>>>>> mihail.vi...@zalando.de> wrote:
>>>>> > Thank you, Robert! The issue with Kafka is now solved with the
>>>>> 0.10-SNAPSHOT dependency.
>>>>> >
>>>>> > We have run into an OutOfMemory exception though, which appears to
>>>>> be related to the state. As my colleague, Javier Lopez, mentioned in a
>>>>> previous thread, state handling is crucial for our use case. And as the
>>>>> jobs are intended to run for months, stability plays an important role in
>>>>> choosing a stream processing framework.
>>>>> >
>>>>> > 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
>>>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>>>> FAILED
>>>>> > java.lang.OutOfMemoryError: Java heap space
>>>>> >     at java.util.HashMap.resize(HashMap.java:703)
>>>>> >     at java.util.HashMap.putVal(HashMap.java:662)
>>>>> >     at java.util.HashMap.put(HashMap.java:611)
>>>>> >     at
>>>>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>>>>> >     at
>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>>>>> >     at
>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>>> >     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>:
>>>>> > Thanks! I've linked the issue in JIRA.
>>>>> >
>>>>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org>
>>>>> wrote:
>>>>> > > I think its this one
>>>>> https://issues.apache.org/jira/browse/KAFKA-824
>>>>> > >
>>>>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org>
>>>>> wrote:
>>>>> > >>
>>>>> > >> I know this has been fixed already but, out of curiosity, could
>>>>> you
>>>>> > >> point me to the Kafka JIRA issue for this
>>>>> > >> bug? From the Flink issue it looks like this is a Zookeeper
>>>>> version
>>>>> > >> mismatch.
>>>>> > >>
>>>>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <
>>>>> rmetz...@apache.org>
>>>>> > >> wrote:
>>>>> > >> > Hi Gyula,
>>>>> > >> >
>>>>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>>>>> > >> > "release-0.10" branch to Apache's maven snapshot repository.
>>>>> > >> >
>>>>> > >> >
>>>>> > >> > I don't think Mihail's code will run when he's compiling it
>>>>> against
>>>>> > >> > 1.0-SNAPSHOT.
>>>>> > >> >
>>>>> > >> >
>>>>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <
>>>>> gyula.f...@gmail.com> wrote:
>>>>> > >> >>
>>>>> > >> >> Hi,
>>>>> > >> >>
>>>>> > >> >> I think Robert meant to write setting the connector dependency
>>>>> to
>>>>> > >> >> 1.0-SNAPSHOT.
>>>>> > >> >>
>>>>> > >> >> Cheers,
>>>>> > >> >> Gyula
>>>>> > >> >>
>>>>> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015.
>>>>> dec. 1.,
>>>>> > >> >> K,
>>>>> > >> >> 17:10):
>>>>> > >> >>>
>>>>> > >> >>> Hi Mihail,
>>>>> > >> >>>
>>>>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink
>>>>> for this
>>>>> > >> >>> as
>>>>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>>>> > >> >>>
>>>>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2
>>>>> will contain
>>>>> > >> >>> a
>>>>> > >> >>> fix.
>>>>> > >> >>>
>>>>> > >> >>> Since the kafka connector is not contained in the flink
>>>>> binary, you
>>>>> > >> >>> can
>>>>> > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT.
>>>>> Maven
>>>>> > >> >>> will
>>>>> > >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>>>>> > >> >>>
>>>>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>>>>> > >> >>> <mihail.vi...@zalando.de>
>>>>> > >> >>> wrote:
>>>>> > >> >>>>
>>>>> > >> >>>> Hi,
>>>>> > >> >>>>
>>>>> > >> >>>> we get the following NullPointerException after ~50 minutes
>>>>> when
>>>>> > >> >>>> running
>>>>> > >> >>>> a streaming job with windowing and state that reads data
>>>>> from Kafka
>>>>> > >> >>>> and
>>>>> > >> >>>> writes the result to local FS.
>>>>> > >> >>>> There are around 170 million messages to be processed, Flink
>>>>> 0.10.1
>>>>> > >> >>>> stops at ~8 million.
>>>>> > >> >>>> Flink runs locally, started with the
>>>>> "start-cluster-streaming.sh"
>>>>> > >> >>>> script.
>>>>> > >> >>>>
>>>>> > >> >>>> 12/01/2015 15:06:24    Job execution switched to status
>>>>> RUNNING.
>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>> Map(1/1)
>>>>> > >> >>>> switched
>>>>> > >> >>>> to SCHEDULED
>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>> Map(1/1)
>>>>> > >> >>>> switched
>>>>> > >> >>>> to DEPLOYING
>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>> Reduce at
>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>> switched to
>>>>> > >> >>>> SCHEDULED
>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>> Reduce at
>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>> switched to
>>>>> > >> >>>> DEPLOYING
>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>> Map(1/1)
>>>>> > >> >>>> switched
>>>>> > >> >>>> to RUNNING
>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>> Reduce at
>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>> switched to
>>>>> > >> >>>> RUNNING
>>>>> > >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of
>>>>> Reduce at
>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>> switched to
>>>>> > >> >>>> CANCELED
>>>>> > >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map ->
>>>>> Map(1/1)
>>>>> > >> >>>> switched
>>>>> > >> >>>> to FAILED
>>>>> > >> >>>> java.lang.Exception
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>> > >> >>>>     at
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>> > >> >>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> > >> >>>> Caused by: java.lang.NullPointerException
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>>> > >> >>>>     at
>>>>> org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>>> > >> >>>>     at
>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>>> > >> >>>>     at
>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>>> > >> >>>>     at
>>>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>>> > >> >>>>     at
>>>>> kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>>> > >> >>>>     at
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>>> > >> >>>>
>>>>> > >> >>>>
>>>>> > >> >>>> Any ideas on what could cause this behaviour?
>>>>> > >> >>>>
>>>>> > >> >>>> Best,
>>>>> > >> >>>> Mihail
>>>>> > >> >>>
>>>>> > >> >>>
>>>>> > >> >
>>>>> > >
>>>>> > >
>>>>> >
>>>>> >
>>>>> >
>>>>>
>>>>>
>>>>
>>
>

Reply via email to