Hi Mihail! Do I understand you correctly that the use case is to raise an alarm if an order has not been processed within a certain time period (certain number of days) ?
If that is the case, the use case is actually perfect for a special form of session windows that monitor such timeouts. I have prototyped a sample application for a different use case, but it should fit your use case as well: https://github.com/StephanEwen/flink-demos/blob/master/timeout-monitoring/src/main/java/com/mythingy/streaming/EventStreamAnalysis.java In that example, the timeout is 5 seconds, but there is no reason why the timeout could not be multiple days. Windows may be very long - no problem. Unlike many other streaming systems, each key has an individual window, so one key's session window may start at one point in time, and the other key's session window at a very different point. One window may finish within in a few hours (fast processed order), one window see the timout after three days (order that was not processed in time). Greetings, Stephan On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail <mihail.vi...@zalando.de> wrote: > Hi Gyula, Hi Stephan, > > thank you for your replies. > > We need a state which grows indefinitely for the following use case. An > event is created when a customer places an order. Another event is created > when the order is sent. These events typically occur within days. We need > to catch the cases when the said events occur over a specified time period > to raise an alarm. > > So having a window of a couple of days is not feasible. Thus we need the > state. > > I believe having a different state backend would circumvent the OOM issue. > We were thinking of Redis for performance reasons. MySQL might do as well, > if it doesn't slow down the processing too much. > > Are there limitations for SqlStateBackend when working with state only? > When would the window state limitation occur? > > Cheers, > Mihail > > > 2015-12-02 13:38 GMT+01:00 Stephan Ewen <se...@apache.org>: > >> Mihail! >> >> The Flink windows are currently in-memory only. There are plans to relax >> that, but for the time being, having enough memory in the cluster is >> important. >> >> @Gyula: I think window state is currently also limited when using the >> SqlStateBackend, by the size of a row in the database (because windows are >> not key/value state currently) >> >> >> Here are some simple rules-of-thumb to work with: >> >> 1) For windows, the number of expected keys can be without bound. It is >> important to have a rough upper bound for the number of "active keys at a >> certain time". For example, if you have your time windows (let's say by 10 >> minutes or so), it only matters how many keys you have within each 10 >> minute interval. Those define how much memory you need. >> >> 2) If you work with the "OperatorState" abstraction, then you need to >> think about cleanup a bit. The OperatorState keeps state currently for as >> long until you set the state for the key to "null". This manual state is >> explicitly designed to allow you to keep state across windows and across >> very long time. On the flip side, you need to manage the amount of state >> you store, by releasing state for keys. >> >> 3) If a certain key space grows infinite, you should "scope the state by >> time". A pragmatic solution for that is to define a session window: >> - The session length defines after what inactivity the state is cleaned >> (let's say 1h session length or so) >> - The trigger implements this session (there are a few mails on this >> list already that explain how to do this) and take care of evaluating on >> every element. >> - A count(1) evictor makes sure only one element is ever stored >> >> Greetings, >> Stephan >> >> >> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <gyf...@apache.org> wrote: >> >>> Hi, >>> >>> I am working on a use case that involves storing state for billions of >>> keys. For this we use a MySql state backend that will write each key-value >>> state to MySql server so it will only hold a limited set of key-value pairs >>> on heap while maintaining the processing guarantees. >>> >>> This will keep our streaming job from running out of memory as most of >>> the state is off heap. I am not sure if this is relevant to your use case >>> but if the state size grows indefinitely you might want to give it a try. >>> >>> I will write a detailed guide in some days but if you want to get >>> started check this one out: >>> >>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing >>> >>> There are some pending improvements that I will commit in the next days >>> that will increase the performance of the MySql adapter >>> >>> Let me know if you are interested in this! >>> >>> Cheers, >>> Gyula >>> >>> >>> Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec. >>> 2., Sze, 11:26): >>> >>>> Hi Aljoscha, >>>> >>>> we have no upper bound for the number of expected keys. The max size >>>> for an element is 1 KB. >>>> >>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText >>>> operators in the job. In the first Map we parse the contained JSON object >>>> in each element and forward it as a Flink Tuple. In the Reduce we update >>>> the state for each key. That's about it. >>>> >>>> Best, >>>> Mihail >>>> >>>> >>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >>>> >>>>> Hi Mihail, >>>>> could you please give some information about the number of keys that >>>>> you are expecting in the data and how big the elements are that you are >>>>> processing in the window. >>>>> >>>>> Also, are there any other operations that could be taxing on Memory. I >>>>> think the different exception you see for 500MB mem size is just because >>>>> Java notices that it ran out of memory at a different part in the program. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de> >>>>> wrote: >>>>> > >>>>> > Yes, with the "start-cluster-streaming.sh" script. >>>>> > If the TaskManager gets 5GB of heap it manages to process ~100 >>>>> million messages and then throws the above OOM. >>>>> > If it gets only 500MB it manages to process ~8 million and a >>>>> somewhat misleading exception is thrown: >>>>> > >>>>> > 12/01/2015 19:14:07 Source: Custom Source -> Map -> Map(1/1) >>>>> switched to FAILED >>>>> > java.lang.Exception: Java heap space >>>>> > at >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>> > at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>>>> > at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> > at java.lang.Thread.run(Thread.java:745) >>>>> > Caused by: java.lang.OutOfMemoryError: Java heap space >>>>> > at org.json.simple.parser.Yylex.<init>(Yylex.java:231) >>>>> > at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34) >>>>> > at >>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70) >>>>> > at >>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65) >>>>> > at >>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) >>>>> > at >>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) >>>>> > at >>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97) >>>>> > at >>>>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92) >>>>> > at >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450) >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>: >>>>> > Its good news that the issue has been resolved. >>>>> > >>>>> > Regarding the OOM, did you start Flink in the streaming mode? >>>>> > >>>>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail < >>>>> mihail.vi...@zalando.de> wrote: >>>>> > Thank you, Robert! The issue with Kafka is now solved with the >>>>> 0.10-SNAPSHOT dependency. >>>>> > >>>>> > We have run into an OutOfMemory exception though, which appears to >>>>> be related to the state. As my colleague, Javier Lopez, mentioned in a >>>>> previous thread, state handling is crucial for our use case. And as the >>>>> jobs are intended to run for months, stability plays an important role in >>>>> choosing a stream processing framework. >>>>> > >>>>> > 12/02/2015 10:03:53 Fast TumblingTimeWindows(5000) of Reduce at >>>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>>>> FAILED >>>>> > java.lang.OutOfMemoryError: Java heap space >>>>> > at java.util.HashMap.resize(HashMap.java:703) >>>>> > at java.util.HashMap.putVal(HashMap.java:662) >>>>> > at java.util.HashMap.put(HashMap.java:611) >>>>> > at >>>>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) >>>>> > at >>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121) >>>>> > at >>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108) >>>>> > at >>>>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196) >>>>> > at >>>>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50) >>>>> > at >>>>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210) >>>>> > at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) >>>>> > at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> > at java.lang.Thread.run(Thread.java:745) >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>: >>>>> > Thanks! I've linked the issue in JIRA. >>>>> > >>>>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> > > I think its this one >>>>> https://issues.apache.org/jira/browse/KAFKA-824 >>>>> > > >>>>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> > >> >>>>> > >> I know this has been fixed already but, out of curiosity, could >>>>> you >>>>> > >> point me to the Kafka JIRA issue for this >>>>> > >> bug? From the Flink issue it looks like this is a Zookeeper >>>>> version >>>>> > >> mismatch. >>>>> > >> >>>>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger < >>>>> rmetz...@apache.org> >>>>> > >> wrote: >>>>> > >> > Hi Gyula, >>>>> > >> > >>>>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the >>>>> > >> > "release-0.10" branch to Apache's maven snapshot repository. >>>>> > >> > >>>>> > >> > >>>>> > >> > I don't think Mihail's code will run when he's compiling it >>>>> against >>>>> > >> > 1.0-SNAPSHOT. >>>>> > >> > >>>>> > >> > >>>>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra < >>>>> gyula.f...@gmail.com> wrote: >>>>> > >> >> >>>>> > >> >> Hi, >>>>> > >> >> >>>>> > >> >> I think Robert meant to write setting the connector dependency >>>>> to >>>>> > >> >> 1.0-SNAPSHOT. >>>>> > >> >> >>>>> > >> >> Cheers, >>>>> > >> >> Gyula >>>>> > >> >> >>>>> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. >>>>> dec. 1., >>>>> > >> >> K, >>>>> > >> >> 17:10): >>>>> > >> >>> >>>>> > >> >>> Hi Mihail, >>>>> > >> >>> >>>>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in Flink >>>>> for this >>>>> > >> >>> as >>>>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067 >>>>> > >> >>> >>>>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2 >>>>> will contain >>>>> > >> >>> a >>>>> > >> >>> fix. >>>>> > >> >>> >>>>> > >> >>> Since the kafka connector is not contained in the flink >>>>> binary, you >>>>> > >> >>> can >>>>> > >> >>> just set the version in your maven pom file to 0.10-SNAPSHOT. >>>>> Maven >>>>> > >> >>> will >>>>> > >> >>> then download the code planned for the 0.10-SNAPSHOT release. >>>>> > >> >>> >>>>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail >>>>> > >> >>> <mihail.vi...@zalando.de> >>>>> > >> >>> wrote: >>>>> > >> >>>> >>>>> > >> >>>> Hi, >>>>> > >> >>>> >>>>> > >> >>>> we get the following NullPointerException after ~50 minutes >>>>> when >>>>> > >> >>>> running >>>>> > >> >>>> a streaming job with windowing and state that reads data >>>>> from Kafka >>>>> > >> >>>> and >>>>> > >> >>>> writes the result to local FS. >>>>> > >> >>>> There are around 170 million messages to be processed, Flink >>>>> 0.10.1 >>>>> > >> >>>> stops at ~8 million. >>>>> > >> >>>> Flink runs locally, started with the >>>>> "start-cluster-streaming.sh" >>>>> > >> >>>> script. >>>>> > >> >>>> >>>>> > >> >>>> 12/01/2015 15:06:24 Job execution switched to status >>>>> RUNNING. >>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>> Map(1/1) >>>>> > >> >>>> switched >>>>> > >> >>>> to SCHEDULED >>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>> Map(1/1) >>>>> > >> >>>> switched >>>>> > >> >>>> to DEPLOYING >>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>> Reduce at >>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>> switched to >>>>> > >> >>>> SCHEDULED >>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>> Reduce at >>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>> switched to >>>>> > >> >>>> DEPLOYING >>>>> > >> >>>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> >>>>> Map(1/1) >>>>> > >> >>>> switched >>>>> > >> >>>> to RUNNING >>>>> > >> >>>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of >>>>> Reduce at >>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>> switched to >>>>> > >> >>>> RUNNING >>>>> > >> >>>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of >>>>> Reduce at >>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) >>>>> switched to >>>>> > >> >>>> CANCELED >>>>> > >> >>>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> >>>>> Map(1/1) >>>>> > >> >>>> switched >>>>> > >> >>>> to FAILED >>>>> > >> >>>> java.lang.Exception >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>> > >> >>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> > >> >>>> at java.lang.Thread.run(Thread.java:745) >>>>> > >> >>>> Caused by: java.lang.NullPointerException >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>>>> > >> >>>> at >>>>> org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>>>> > >> >>>> at >>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>>>> > >> >>>> at >>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>>>> > >> >>>> at >>>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>>>> > >> >>>> at >>>>> kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>>>> > >> >>>> at >>>>> > >> >>>> >>>>> > >> >>>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>>>> > >> >>>> >>>>> > >> >>>> >>>>> > >> >>>> Any ideas on what could cause this behaviour? >>>>> > >> >>>> >>>>> > >> >>>> Best, >>>>> > >> >>>> Mihail >>>>> > >> >>> >>>>> > >> >>> >>>>> > >> > >>>>> > > >>>>> > > >>>>> > >>>>> > >>>>> > >>>>> >>>>> >>>> >> >