A bit of extra information on the example where I posted the link:

The example checks whether two events follow each other within a certain
time:
  - The first event in the example is called "compute.instance.create.start"
(in your case, it would be the event that an order was placed)
  - The second event is called "trove.instance.create" - (in your case that
the package was sent)

What the timeout window does is the following:
  - It triggers either on the second event, or after the timeout is expired
  - The window function checks if the last event was the correct second
event. If yes, it sends a Result(OK), if not it sends a Result(TIMEOUT).

Hope that this helps you build your application!



On Wed, Dec 2, 2015 at 6:25 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Mihail!
>
> Do I understand you correctly that the use case is to raise an alarm if an
> order has not been processed within a certain time period (certain number
> of days) ?
>
> If that is the case, the use case is actually perfect for a special form
> of session windows that monitor such timeouts. I have prototyped a sample
> application for a different use case, but it should fit your use case as
> well:
>
> https://github.com/StephanEwen/flink-demos/blob/master/timeout-monitoring/src/main/java/com/mythingy/streaming/EventStreamAnalysis.java
>
> In that example, the timeout is 5 seconds, but there is no reason why the
> timeout could not be multiple days. Windows may be very long - no problem.
>
> Unlike many other streaming systems, each key has an individual window, so
> one key's session window may start at one point in time, and the other
> key's session window at a very different point. One window may finish
> within in a few hours (fast processed order), one window see the timout
> after three days (order that was not processed in time).
>
> Greetings,
> Stephan
>
>
> On Wed, Dec 2, 2015 at 6:11 PM, Vieru, Mihail <mihail.vi...@zalando.de>
> wrote:
>
>> Hi Gyula, Hi Stephan,
>>
>> thank you for your replies.
>>
>> We need a state which grows indefinitely for the following use case. An
>> event is created when a customer places an order. Another event is created
>> when the order is sent. These events typically occur within days. We need
>> to catch the cases when the said events occur over a specified time period
>> to raise an alarm.
>>
>> So having a window of a couple of days is not feasible. Thus we need the
>> state.
>>
>> I believe having a different state backend would circumvent the OOM
>> issue. We were thinking of Redis for performance reasons. MySQL might do as
>> well, if it doesn't slow down the processing too much.
>>
>> Are there limitations for SqlStateBackend when working with state only?
>> When would the window state limitation occur?
>>
>> Cheers,
>> Mihail
>>
>>
>> 2015-12-02 13:38 GMT+01:00 Stephan Ewen <se...@apache.org>:
>>
>>> Mihail!
>>>
>>> The Flink windows are currently in-memory only. There are plans to relax
>>> that, but for the time being, having enough memory in the cluster is
>>> important.
>>>
>>> @Gyula: I think window state is currently also limited when using the
>>> SqlStateBackend, by the size of a row in the database (because windows are
>>> not key/value state currently)
>>>
>>>
>>> Here are some simple rules-of-thumb to work with:
>>>
>>> 1) For windows, the number of expected keys can be without bound. It is
>>> important to have a rough upper bound for the number of "active keys at a
>>> certain time". For example, if you have your time windows (let's say by 10
>>> minutes or so), it only matters how many keys you have within each 10
>>> minute interval. Those define how much memory you need.
>>>
>>> 2) If you work with the "OperatorState" abstraction, then you need to
>>> think about cleanup a bit. The OperatorState keeps state currently for as
>>> long until you set the state for the key to "null". This manual state is
>>> explicitly designed to allow you to keep state across windows and across
>>> very long time. On the flip side, you need to manage the amount of state
>>> you store, by releasing state for keys.
>>>
>>> 3) If a certain key space grows infinite, you should "scope the state by
>>> time". A pragmatic solution for that is to define a session window:
>>>   - The session length defines after what inactivity the state is
>>> cleaned (let's say 1h session length or so)
>>>   - The trigger implements this session (there are a few mails on this
>>> list already that explain how to do this) and take care of evaluating on
>>> every element.
>>>   - A count(1) evictor makes sure only one element is ever stored
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Dec 2, 2015 at 11:37 AM, Gyula Fóra <gyf...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am working on a use case that involves storing state for billions of
>>>> keys. For this we use a MySql state backend that will write each key-value
>>>> state to MySql server so it will only hold a limited set of key-value pairs
>>>> on heap while maintaining the processing guarantees.
>>>>
>>>> This will keep our streaming job from running out of memory as most of
>>>> the state is off heap. I am not sure if this is relevant to your use case
>>>> but if the state size grows indefinitely you might want to give it a try.
>>>>
>>>> I will write a detailed guide in some days but if you want to get
>>>> started check this one out:
>>>>
>>>> https://docs.google.com/document/d/1xx3J88ZR_kuYYHAil3HD3YSBLqBTKV4Pu1SLwhiRiGs/edit?usp=sharing
>>>>
>>>> There are some pending improvements that I will commit in the next days
>>>> that will increase the performance of the MySql adapter
>>>>
>>>> Let me know if you are interested in this!
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>>
>>>> Vieru, Mihail <mihail.vi...@zalando.de> ezt írta (időpont: 2015. dec.
>>>> 2., Sze, 11:26):
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> we have no upper bound for the number of expected keys. The max size
>>>>> for an element is 1 KB.
>>>>>
>>>>> There are 2 Maps, a KeyBy, a timeWindow, a Reduce and a writeAsText
>>>>> operators in the job. In the first Map we parse the contained JSON object
>>>>> in each element and forward it as a Flink Tuple. In the Reduce we update
>>>>> the state for each key. That's about it.
>>>>>
>>>>> Best,
>>>>> Mihail
>>>>>
>>>>>
>>>>> 2015-12-02 11:09 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>>
>>>>>> Hi Mihail,
>>>>>> could you please give some information about the number of keys that
>>>>>> you are expecting in the data and how big the elements are that you are
>>>>>> processing in the window.
>>>>>>
>>>>>> Also, are there any other operations that could be taxing on Memory.
>>>>>> I think the different exception you see for 500MB mem size is just 
>>>>>> because
>>>>>> Java notices that it ran out of memory at a different part in the 
>>>>>> program.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> > On 02 Dec 2015, at 10:57, Vieru, Mihail <mihail.vi...@zalando.de>
>>>>>> wrote:
>>>>>> >
>>>>>> > Yes, with the "start-cluster-streaming.sh" script.
>>>>>> > If the TaskManager gets 5GB of heap it manages to process ~100
>>>>>> million messages and then throws the above OOM.
>>>>>> > If it gets only 500MB it manages to process ~8 million and a
>>>>>> somewhat misleading exception is thrown:
>>>>>> >
>>>>>> > 12/01/2015 19:14:07    Source: Custom Source -> Map -> Map(1/1)
>>>>>> switched to FAILED
>>>>>> > java.lang.Exception: Java heap space
>>>>>> >     at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>>>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>>>>>> >     at org.json.simple.parser.Yylex.<init>(Yylex.java:231)
>>>>>> >     at org.json.simple.parser.JSONParser.<init>(JSONParser.java:34)
>>>>>> >     at
>>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:70)
>>>>>> >     at
>>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$1.map(ItemPriceAvgPerOrder.java:65)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$SourceOutput.collect(SourceStreamTask.java:97)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:92)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:450)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > 2015-12-02 10:45 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>>>>>> > Its good news that the issue has been resolved.
>>>>>> >
>>>>>> > Regarding the OOM, did you start Flink in the streaming mode?
>>>>>> >
>>>>>> > On Wed, Dec 2, 2015 at 10:18 AM, Vieru, Mihail <
>>>>>> mihail.vi...@zalando.de> wrote:
>>>>>> > Thank you, Robert! The issue with Kafka is now solved with the
>>>>>> 0.10-SNAPSHOT dependency.
>>>>>> >
>>>>>> > We have run into an OutOfMemory exception though, which appears to
>>>>>> be related to the state. As my colleague, Javier Lopez, mentioned in a
>>>>>> previous thread, state handling is crucial for our use case. And as the
>>>>>> jobs are intended to run for months, stability plays an important role in
>>>>>> choosing a stream processing framework.
>>>>>> >
>>>>>> > 12/02/2015 10:03:53    Fast TumblingTimeWindows(5000) of Reduce at
>>>>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>>>>> FAILED
>>>>>> > java.lang.OutOfMemoryError: Java heap space
>>>>>> >     at java.util.HashMap.resize(HashMap.java:703)
>>>>>> >     at java.util.HashMap.putVal(HashMap.java:662)
>>>>>> >     at java.util.HashMap.put(HashMap.java:611)
>>>>>> >     at
>>>>>> org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
>>>>>> >     at
>>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:121)
>>>>>> >     at
>>>>>> de.zalando.saiki.gerakanflink.ItemPriceAvgPerOrder$3.reduce(ItemPriceAvgPerOrder.java:108)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.KeyMap.putOrAggregate(KeyMap.java:196)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.addElementToLatestPane(AggregatingKeyedTimePanes.java:50)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.processElement(AbstractAlignedProcessingTimeWindowOperator.java:210)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>>>>> >     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>>> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>> >     at java.lang.Thread.run(Thread.java:745)
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > 2015-12-01 17:42 GMT+01:00 Maximilian Michels <m...@apache.org>:
>>>>>> > Thanks! I've linked the issue in JIRA.
>>>>>> >
>>>>>> > On Tue, Dec 1, 2015 at 5:39 PM, Robert Metzger <rmetz...@apache.org>
>>>>>> wrote:
>>>>>> > > I think its this one
>>>>>> https://issues.apache.org/jira/browse/KAFKA-824
>>>>>> > >
>>>>>> > > On Tue, Dec 1, 2015 at 5:37 PM, Maximilian Michels <
>>>>>> m...@apache.org> wrote:
>>>>>> > >>
>>>>>> > >> I know this has been fixed already but, out of curiosity, could
>>>>>> you
>>>>>> > >> point me to the Kafka JIRA issue for this
>>>>>> > >> bug? From the Flink issue it looks like this is a Zookeeper
>>>>>> version
>>>>>> > >> mismatch.
>>>>>> > >>
>>>>>> > >> On Tue, Dec 1, 2015 at 5:16 PM, Robert Metzger <
>>>>>> rmetz...@apache.org>
>>>>>> > >> wrote:
>>>>>> > >> > Hi Gyula,
>>>>>> > >> >
>>>>>> > >> > no, I didn't ;) We still deploy 0.10-SNAPSHOT versions from the
>>>>>> > >> > "release-0.10" branch to Apache's maven snapshot repository.
>>>>>> > >> >
>>>>>> > >> >
>>>>>> > >> > I don't think Mihail's code will run when he's compiling it
>>>>>> against
>>>>>> > >> > 1.0-SNAPSHOT.
>>>>>> > >> >
>>>>>> > >> >
>>>>>> > >> > On Tue, Dec 1, 2015 at 5:13 PM, Gyula Fóra <
>>>>>> gyula.f...@gmail.com> wrote:
>>>>>> > >> >>
>>>>>> > >> >> Hi,
>>>>>> > >> >>
>>>>>> > >> >> I think Robert meant to write setting the connector
>>>>>> dependency to
>>>>>> > >> >> 1.0-SNAPSHOT.
>>>>>> > >> >>
>>>>>> > >> >> Cheers,
>>>>>> > >> >> Gyula
>>>>>> > >> >>
>>>>>> > >> >> Robert Metzger <rmetz...@apache.org> ezt írta (időpont:
>>>>>> 2015. dec. 1.,
>>>>>> > >> >> K,
>>>>>> > >> >> 17:10):
>>>>>> > >> >>>
>>>>>> > >> >>> Hi Mihail,
>>>>>> > >> >>>
>>>>>> > >> >>> the issue is actually a bug in Kafka. We have a JIRA in
>>>>>> Flink for this
>>>>>> > >> >>> as
>>>>>> > >> >>> well: https://issues.apache.org/jira/browse/FLINK-3067
>>>>>> > >> >>>
>>>>>> > >> >>> Sadly, we haven't released a fix for it yet. Flink 0.10.2
>>>>>> will contain
>>>>>> > >> >>> a
>>>>>> > >> >>> fix.
>>>>>> > >> >>>
>>>>>> > >> >>> Since the kafka connector is not contained in the flink
>>>>>> binary, you
>>>>>> > >> >>> can
>>>>>> > >> >>> just set the version in your maven pom file to
>>>>>> 0.10-SNAPSHOT. Maven
>>>>>> > >> >>> will
>>>>>> > >> >>> then download the code planned for the 0.10-SNAPSHOT release.
>>>>>> > >> >>>
>>>>>> > >> >>> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail
>>>>>> > >> >>> <mihail.vi...@zalando.de>
>>>>>> > >> >>> wrote:
>>>>>> > >> >>>>
>>>>>> > >> >>>> Hi,
>>>>>> > >> >>>>
>>>>>> > >> >>>> we get the following NullPointerException after ~50 minutes
>>>>>> when
>>>>>> > >> >>>> running
>>>>>> > >> >>>> a streaming job with windowing and state that reads data
>>>>>> from Kafka
>>>>>> > >> >>>> and
>>>>>> > >> >>>> writes the result to local FS.
>>>>>> > >> >>>> There are around 170 million messages to be processed,
>>>>>> Flink 0.10.1
>>>>>> > >> >>>> stops at ~8 million.
>>>>>> > >> >>>> Flink runs locally, started with the
>>>>>> "start-cluster-streaming.sh"
>>>>>> > >> >>>> script.
>>>>>> > >> >>>>
>>>>>> > >> >>>> 12/01/2015 15:06:24    Job execution switched to status
>>>>>> RUNNING.
>>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>>> Map(1/1)
>>>>>> > >> >>>> switched
>>>>>> > >> >>>> to SCHEDULED
>>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>>> Map(1/1)
>>>>>> > >> >>>> switched
>>>>>> > >> >>>> to DEPLOYING
>>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>>> Reduce at
>>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>>> switched to
>>>>>> > >> >>>> SCHEDULED
>>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>>> Reduce at
>>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>>> switched to
>>>>>> > >> >>>> DEPLOYING
>>>>>> > >> >>>> 12/01/2015 15:06:24    Source: Custom Source -> Map ->
>>>>>> Map(1/1)
>>>>>> > >> >>>> switched
>>>>>> > >> >>>> to RUNNING
>>>>>> > >> >>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of
>>>>>> Reduce at
>>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>>> switched to
>>>>>> > >> >>>> RUNNING
>>>>>> > >> >>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of
>>>>>> Reduce at
>>>>>> > >> >>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1)
>>>>>> switched to
>>>>>> > >> >>>> CANCELED
>>>>>> > >> >>>> 12/01/2015 15:56:08    Source: Custom Source -> Map ->
>>>>>> Map(1/1)
>>>>>> > >> >>>> switched
>>>>>> > >> >>>> to FAILED
>>>>>> > >> >>>> java.lang.Exception
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>>> > >> >>>>     at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>> > >> >>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> > >> >>>> Caused by: java.lang.NullPointerException
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>>>> > >> >>>>     at
>>>>>> org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>>>> > >> >>>>     at
>>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>>>> > >> >>>>     at
>>>>>> org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>>>> > >> >>>>     at
>>>>>> kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>>>> > >> >>>>     at
>>>>>> kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>>>> > >> >>>>     at
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>>>>> > >> >>>>
>>>>>> > >> >>>>
>>>>>> > >> >>>> Any ideas on what could cause this behaviour?
>>>>>> > >> >>>>
>>>>>> > >> >>>> Best,
>>>>>> > >> >>>> Mihail
>>>>>> > >> >>>
>>>>>> > >> >>>
>>>>>> > >> >
>>>>>> > >
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Reply via email to