Re: CEP issue

2018-03-07 Thread Kostas Kloudas
Hi Vishal,

Thanks a lot for sharing your experience and the potential caveats to consider 
when 
specifying your pattern.

I agree that there is room for improvement when it comes to the state 
checkpointed in Flink.
We already have some ideas but still, as you also said, the bulk of the space 
consumption
comes from the pattern definition, so it could be nice if more people did the 
same, i.e. sharing 
their experience, and why not, compiling a guide of things to avoid and put it 
along the rest
of FlinkCEP documentation.

What do you think?

Kostas



> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> Hello all,  There were recent changes to the flink master that I pulled in 
> and that seems to have solved our issue.  
> 
> Few points 
> 
> * CEP is heavy as the NFA  transition  matrix   as state which can be  
> possibly  n^2 ( worst case )  can easily blow up space requirements.  The 
> after match skip strategy is likely to play a crucial role in keeping the 
> state lean 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy>.
>   In our case we do not require partial matches within a match to contribute 
> to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT 
> was used which on match will prune the SharedBuffer ( almost reset it ) 
> 
> * The argument that the pattern events should be lean holds much more in CEP 
> due to the potential exponential increase in space requirements. 
> 
> * The nature of the pattern will require consideration if state does blow up 
> for you.
> 
> Apart from that, I am still not sure why toString() on SharedBuffer was 
> called to get an OOM to begin with.
> 
> 
> 
> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> We could not recreate in a controlled setup, but here are a few notes that we 
> have gathered on a simple  "times(n),within(..)"
> 
> In case where the Event does not create a Final or Stop state
> 
> * As an NFA processes an Event, NFA mutates if there is a true Event. Each 
> computation is a counter that keeps track of partial matches with each true 
> Event already existent partial match for that computation unit. Essentially 
> for n Events and if each Event is a true there will be roughly n-1 
> computations, each with representing an Event from 1 to n-1 ( so 1 or first 
> will have n-1 events in the partial match, 2 has n-1 events and so on and n-1 
> has the last event as a partial match ).
> 
> * If the WM progresses  beyond the ts of the 1st computation, that partial 
> match is pruned.
> 
> * It makes sure that a SharedBufferEntry is pruned only if the count of Edges 
> originating from it reduces to 0 ( the internalRemove() which uses a Stack) , 
> which should happen as WM keeps progressing to the nth element for 
> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a WM 
> progression 
> 
> 
> In case there is a FinalState  ( and we skipToFirstAfterLast ) 
> 
> * The NFA by will prune ( release )  all partial matches and prune the shared 
> buffer and emit the current match. The computations now should be empty.
> 
> There is a lot to it, but is that roughly what is done in that code ?
> 
> 
> 
> Few questions. 
> 
> * What we have seen is that the call to toString method of SharedBuffer is 
> where OOM occurs. Now in the code there is no call to a Log so we are not 
> sure why the method or who calls that method. Surely that is not part of the 
> Seriazation/DeSer routine or is it ( very surprising if it is ) 
> * There is no out of the box implementation of "m out of n"  pattern match. 
> We have to resort to n in range ( m * time series slot ) which we do. This is 
> fine but what it does not allow is an optimization where if n false 
> conditions are seen, one can prune.  Simply speaking if n-m  false have been 
> seen there is no way  that out of n there will be ever m trues and thus 
> SharedBuffer can be pruned to the last true seen ( very akin to 
> skipToFirstAfterLast ).  
> 
> We will keep instrumenting the code ( which apart from the null message is 
> easily understandable ) but would love to hear your feedback. 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>  
> 
> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Thanks a lo

Re: Simple CEP pattern

2018-03-07 Thread Kostas Kloudas
Hi Esa,

You could try the examples either from the documentation or from the training.
http://training.data-artisans.com/exercises/CEP.html 


Kostas

> On Mar 7, 2018, at 11:32 AM, Esa Heikkinen  
> wrote:
> 
> What would be the simplest working CEP (Scala) pattern ?
> 
> I want to test if my CEP application works at all.
>  
> Best, Esa



Re: Questions about the FlinkCEP

2018-03-01 Thread Kostas Kloudas
Hi, 

So yes you can do it with IterativeConditions.

Cheers,
Kostas


> On Mar 1, 2018, at 1:15 PM, Esa Heikkinen <esa.heikki...@student.tut.fi> 
> wrote:
> 
>  
> Hi
>  
> 6) I meant that in the first step the CEP pattern queries value for “Id” and 
> stores the value to (global) variable for later use in the same pattern or 
> even other places in the application. Is this possible ?
>  
> Best, Esa
>  
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, March 1, 2018 11:35 AM
> To: Esa Heikkinen <esa.heikki...@student.tut.fi>
> Cc: user@flink.apache.org
> Subject: Re: Questions about the FlinkCEP
>  
> Hi Esa,
> 
> The answers to the questions are inlined.
> 
> 
> On Feb 28, 2018, at 8:32 PM, Esa Heikkinen <heikk...@student.tut.fi 
> <mailto:heikk...@student.tut.fi>> wrote:
> 
> Hi
> 
> I have tried to learn FlinkCEP [1], but i have yet not found the clear 
> answers for questions:
> 1) Whether the pattern of CEP is meant only for one data stream at the same 
> time ?
> Yes.
> 
> 2) If i have many different parallel data streams (or sources), should i 
> combine them into one data stream (and is this possible ?), if i want to use 
> same CEP pattern for all parallel streams at the same time ?
> 
> Yes you should somehow combine them. This can be done with .union if this is 
> ok for you logic but 
> how to do it depends on your job. Or you can just apply the same pattern to 
> all your streams and then
> union the result into a single output stream.
> 
> 
> 3) What is the format of data stream of events for CEP ?
>  
> The input can be an arbitrary data stream.
>  
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html>
> https://flink.apache.org/news/2016/04/06/cep-monitoring.html 
> <https://flink.apache.org/news/2016/04/06/cep-monitoring.html>
> https://data-artisans.com/blog/complex-event-processing-flink-cep-update 
> <https://data-artisans.com/blog/complex-event-processing-flink-cep-update>
> 
> 
> 
> 4) Can i directly supply a data stream from env.socketTextStream() to CEP ? 
>  
> Yes. Why don’t you try it out ;)
> 
> 
> 5) Can one event in stream include many "attributes" and values ? Or is it 
> only key-value-pair ? Timestamp of event ?
>  
> An event can have arbitrary format. It is up to you to interpret your data.
> 
> 
> 6) Can CEP save the found values of events for use in later "steps" ? For 
> example in pattern:
> 
> val pattern = Pattern.begin("start").where(_.getId == 42)
>  
>   
> .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
>  
>   
> .followedBy("end").where(_.getName == "end")
> Whether value of Id in"start"-step can be saved and check whether it (Id) is 
> same in "end"-step ?
> 
>  
> If I get the question right, you can do it with Iterative conditions, as 
> described in the documentation.
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html>
> 
> Best, Esa
>  
> Best,
> Kostas
> 



Re: Questions about the FlinkCEP

2018-03-01 Thread Kostas Kloudas
Hi Esa,

The answers to the questions are inlined.

> On Feb 28, 2018, at 8:32 PM, Esa Heikkinen  wrote:
> 
> Hi
> 
> I have tried to learn FlinkCEP [1], but i have yet not found the clear 
> answers for questions:
> 1) Whether the pattern of CEP is meant only for one data stream at the same 
> time ?
Yes.
> 2) If i have many different parallel data streams (or sources), should i 
> combine them into one data stream (and is this possible ?), if i want to use 
> same CEP pattern for all parallel streams at the same time ?

Yes you should somehow combine them. This can be done with .union if this is ok 
for you logic but 
how to do it depends on your job. Or you can just apply the same pattern to all 
your streams and then
union the result into a single output stream.

> 3) What is the format of data stream of events for CEP ?

The input can be an arbitrary data stream.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html 

https://flink.apache.org/news/2016/04/06/cep-monitoring.html 

https://data-artisans.com/blog/complex-event-processing-flink-cep-update 

> 
> 4) Can i directly supply a data stream from env.socketTextStream() to CEP ? 

Yes. Why don’t you try it out ;)

> 5) Can one event in stream include many "attributes" and values ? Or is it 
> only key-value-pair ? Timestamp of event ?

An event can have arbitrary format. It is up to you to interpret your data.

> 6) Can CEP save the found values of events for use in later "steps" ? For 
> example in pattern:
> 
> val pattern = Pattern.begin("start").where(_.getId == 42)
> 
>   
> .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
> 
>   
> .followedBy("end").where(_.getName == "end")
> Whether value of Id in"start"-step can be saved and check whether it (Id) is 
> same in "end"-step ?
> 

If I get the question right, you can do it with Iterative conditions, as 
described in the documentation.

> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html
> 
> Best, Esa

Best,
Kostas



Re: Important (proposed) CEP changes for Flink 1.5.

2018-02-21 Thread Kostas Kloudas
Hi Vishal,

Yes these changes will only affect CEP.

Cheers,
Kostas

> On Feb 21, 2018, at 5:42 PM, Vishal Santoshi <vishal.santo...@gmail.com> 
> wrote:
> 
> That should be fine by us. We do not plan to do a SP and restore on our prod 
> CEP pipe, when we move to 1.5 from 1.4. I do hope that all other pipes will 
> remain back ward compatible ( 1.4 to 1.5 ).
> 
> On Wed, Feb 21, 2018 at 5:06 AM, Kostas Kloudas <kklou...@gmail.com 
> <mailto:kklou...@gmail.com>> wrote:
> Hi all,
> 
> Currently due to backwards compatibility there are some issues that seem to 
> be affecting CEP users that use RocksDB. As an example you can see this:
> 
> https://issues.apache.org/jira/browse/FLINK-7756 
> <https://issues.apache.org/jira/browse/FLINK-7756>
> 
> We suspect that the issues have mainly to do with the way state was kept 
> internally in previous versions (using java serialization) and got inherited 
> to the newer ones.
> 
> Given this, we would like to introduce a revamped state serialization 
> strategy which solves the sources of these problems and also reduces the size 
> of the state kept at each checkpoint. The problem with this, is that the 
> state format changes so already existing jobs will not be able to restart 
> from a savepoint taken from a previous version of the library.
> 
> I am writing this email to see if there are any objections to merging these 
> changes in the 1.5 release which is about to be released. I suggest to wait 
> for responses until Friday 5pm UTC+1, and if there is non, then we merge. If 
> there are any, then we have to see how to proceed.
> 
> You can try out the changes in this branch: 
> https://github.com/aljoscha/flink/commits/fix-flink-cep-serialization 
> <https://github.com/aljoscha/flink/commits/fix-flink-cep-serialization> and 
> report here if you have any problems or suggestions.
> 
> Also feel free to forward this to other users, if I forgot someone.
> 
> Cheers,
> Kostas
> 
> 



Re: Optimizing multiple aggregate queries on a CEP using Flink

2018-02-15 Thread Kostas Kloudas
Hi Sahil,

Currently CEP does not support multi-query optimizations out-of-the-box.
In some cases you can do manual optimizations to your code, but there is 
no optimizer involved.

Cheers,
Kostas

> On Feb 15, 2018, at 11:12 AM, Sahil Arora  wrote:
> 
> Hi Timo,
> Thanks a lot for the help. I will be looking forward to a reply from Kostas 
> to be clearer on this.
>  
> 
> On Mon, 12 Feb 2018, 10:01 pm Timo Walther,  > wrote:
> Hi Sahil,
> 
> I'm not a CEP expert but I will loop in Kostas (in CC). In general, the 
> example that you described can be easily done with a ProcessFunction [1]. A 
> process function not only allows to keep state (like a count) but also allows 
> you to set timers flexibly for specific use cases such that aggregations can 
> be triggered/reused. So in general I would say that implementing and testing 
> such an algorithm is possible. How easy it can be interegrated into the CEP 
> API, I don't know.
> 
> Regards,
> Timo
> 
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
>  
> 
> 
> Am 2/9/18 um 11:28 PM schrieb Sahil Arora:
>> Hi there,
>> We have been working on a project with the title "Optimizing Multiple 
>> Aggregate Queries over a Complex Event Processing Engine". The aim is to 
>> optimize a group of queries. Take such as "how many cars passed the post in 
>> the past 1 minute" and "how many cars passed the post in the past 2 minutes" 
>> are 2 queries, and the naive and inefficient method to answer both the 
>> queries is to independently solve both of these queries one by one and find 
>> the answer. However, the optimum way would be to minimize the computation by 
>> using the answer given by query 1 and using it in query 2. This is basically 
>> what our aim is, to minimize computation cost when we have multiple 
>> aggregate queries in a CEP.
>> 
>> We have been searching for some platform which supports CEP, and Flink is 
>> probably one of them. Hence, it would be very helpful if we could get some 
>> answers to the following questions:
>> 
>> 1. Does flink already have some method of optimizing multiple aggregate 
>> queries?
>> 2. Is it possible for us to implement / test such an algorithm in flink 
>> which considers multiple queries in a CEP, like having a database of SQL 
>> queries and testing an algorithm of our choice? 
>> 
>> Any other inputs which may help us with solving the problem would be highly 
>> welcome.
>> 
>> Thanks a lot.
>> -- 
>> Sahil Arora
>> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
>> Web: https://sahilarora535.github.io 
>> LinkedIn: sahilarora535 
>> Ph: +91-8130506047 
> -- 
> Sahil Arora
> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
> Web: https://sahilarora535.github.io 
> LinkedIn: sahilarora535 
> Ph: +91-8130506047 


Re: CEP for time series in csv-file

2018-02-08 Thread Kostas Kloudas
Hi Esa,

I think the best place to start is the documentation available at the flink 
website.

Some pointers are the following: 

CEP documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html 


Blog post with CEP example: 
https://data-artisans.com/blog/complex-event-processing-flink-cep-update 


Cheers,
Kostas

> On Feb 8, 2018, at 4:28 PM, Esa Heikkinen  
> wrote:
> 
> Hi
>  
> I have cvs-file(s) that contain an event in every row and first column is 
> time stamp of event. Rest of columns are data and “attributes” of event.
>  
> I’d want to write simple Scala code that: 1) reads data of csv-file 2) 
> converts data of csv-file compatible for CEP 3) sets pattern for CEP 4) Runs 
> CEP  5) writes results
>  
> Do you have any hints or examples how to do that ?
>  
> By the way, what kind of time stamp should be in csv-file ?



Re: Flink CEP exception during RocksDB update

2018-02-06 Thread Kostas Kloudas
Hi Varun,

The branch I previously sent you has been now merged to the master.
So could you try the master and tell us if you see any change in the behavior? 
Has the problem been fixed, or has the message of the exception changed?

Thanks, 
Kostas

> On Jan 29, 2018, at 10:09 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi again Varun,
> 
> I am investigating the problem you mentioned and I found a bug in the 
> SharedBuffer, 
> but I am not sure if it is the only bug that affects you.
> 
> Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv 
> <https://github.com/kl0u/flink/tree/cep-inv> and let me know
> if the problem is still there?
> 
> In addition, are you using Scala with case classes or Java?
> 
> Thanks for helping fix the problem,
> Kostas
> 
>> On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Varun,
>> 
>> Thanks for taking time to look into it. Could you give a sample input and 
>> your pattern to reproduce the problem?
>> That would help a lot at figuring out the cause of the problem.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhor...@gmail.com 
>>> <mailto:varundhor...@gmail.com>> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> I was able to reproduce the error with 1.4.0. After upgrading the cluster 
>>> to 1.5 snapshot and running through the same data I am still experiencing 
>>> the same exception. CEP patterns that I am running are using followed by 
>>> patterns e.g AfBfC. From my experience I was never able to get stable 
>>> execution when checkpoints are enabled. When I disable checkpoints CEP jobs 
>>> are running fine. Aside from this particular error I also notice that 
>>> majority of checkpoints expire as the do not complete within configured 5 
>>> min timeout period. Any suggestions on further debugging runtime 
>>> checkpoints would be very helpful. 
>>> Thanks in advance for your assistance.
>>> 
>>> Regards,
>>> Varun 
>>> 
>>> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>>> Thanks a lot Varun!
>>>> 
>>>> Kostas
>>>> 
>>>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com 
>>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>>> 
>>>>> Thank you Kostas. Since this error is not easily reproducible on my end 
>>>>> I’ll continue testing this and confirm the resolution once I am able to 
>>>>> do so.
>>>>> 
>>>>> Thanks,
>>>>> Varun 
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>>>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>>>> 
>>>>>> Hi Varun,
>>>>>> 
>>>>>> This can be related to this issue: 
>>>>>> https://issues.apache.org/jira/browse/FLINK-8226 
>>>>>> <https://issues.apache.org/jira/browse/FLINK-8226>
>>>>>> which is currently fixed on the master.
>>>>>> 
>>>>>> Could you please try the current master to see if the error persists?
>>>>>> 
>>>>>> Thanks,
>>>>>> Kostas
>>>>>> 
>>>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com 
>>>>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> Hello Flink community,
>>>>>>>>  
>>>>>>>> I have encountered following exception while testing 1.4.0 release. 
>>>>>>>> This error is occurring intermittently and my CEP job keeps restarting 
>>>>>>>> after this exception. I am running the job with Event time semantics 
>>>>>>>> and checkpoints enabled.
>>>>>>>>  
>>>>>>>>  
>>>>>>>> java.lang.RuntimeException: Exception occurred while 
>>>>>>>> processing valve output

Re: CEP issue

2018-02-06 Thread Kostas Kloudas
Thanks a lot Vishal! 

We are looking forward to a test case that reproduces the failure.

Kostas

> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi  wrote:
> 
> This is the pattern. Will create a test case. 
> /**
>  *
>  * @param condition a single condition is applied as a  acceptance criteria
>  * @param params defining the bounds of the pattern.
>  * @param  the element in the stream
>  * @return compiled pattern alonf with the params.
>  */
> public static  RelaxedContiguousPattern 
> of(SimpleCondition condition,
>   
> RelaxedContiguityWithinTime params,
>   
> RichMapFunction, List> mapFunc,
>   
> String patternId) {
> assert (params.seriesLength >= params.elementCount && params.elementCount 
> > 0);
> Pattern pattern = Pattern.
> begin(START).
> where(condition);
> if (params.elementCount > 1) pattern = pattern.
> followedBy(REST).
> where(condition).
> times(params.elementCount - 1);
> 
> return new RelaxedContiguousPattern(
> pattern.within(Time.minutes(params.seriesLength * 
> params.period.duration))
> ,params, 
> params.elementCount > 1, 
> params.period.duration, 
> mapFunc, 
> patternId
> );
> }
> 
> 
> 
> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz  > wrote:
> Could you provide some example to reproduce the case? Or the Pattern that you 
> are using? It would help track down the issue.
> 
> > On 2 Feb 2018, at 13:35, Vishal Santoshi  > > wrote:
> >
> > I have pulled in the flink master cep library and the runtime ( the cluster 
> > ) is configured to work against the latest and greatest. This does not 
> > happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is always 
> > an issue when it is a larger range ( 20 out of 25 with range of 8 hours ) . 
> > Does that makes sense?
> >
> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz 
> > > wrote:
> > This problem sounds very similar to this one that was fixed for 1.4.1 and 
> > 1.5.0:
> > https://issues.apache.org/jira/browse/FLINK-8226 
> > 
> >
> > Could you check if that helps with your problem too?
> >
> > > On 1 Feb 2018, at 23:34, Vishal Santoshi  > > > wrote:
> > >
> > > I have flink master CEP library code imported to  a 1.4 build.
> > >
> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi 
> > > > wrote:
> > > A new one
> > >
> > > java.lang.OutOfMemoryError: Java heap space
> > >   at java.util.Arrays.copyOf(
> > > Arrays.java:3332)
> > >   at java.lang.
> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
> > > 124)
> > >   at java.lang.
> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
> > > 448)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:136)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4106)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
> > > 673)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4097)
> > >   at org.apache.commons.lang3.
> > > StringUtils.join(StringUtils.
> > > java:4151)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEntry.toString(
> > > SharedBuffer.java:624)
> > >   at java.lang.String.valueOf(
> > > String.java:2994)
> > >   at java.lang.StringBuilder.
> > > append(StringBuilder.java:131)
> > >   at org.apache.flink.cep.nfa.
> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
> > > .
> > > .
> > > .
> > > It is the toString() on
> > > SharedBuffer
> > > no doubt. Some recursive loop ?
> > >
> > >
> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi 
> > > 

Re: Flink CEP exception during RocksDB update

2018-01-29 Thread Kostas Kloudas
Hi again Varun,

I am investigating the problem you mentioned and I found a bug in the 
SharedBuffer, 
but I am not sure if it is the only bug that affects you.

Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv 
<https://github.com/kl0u/flink/tree/cep-inv> and let me know
if the problem is still there?

In addition, are you using Scala with case classes or Java?

Thanks for helping fix the problem,
Kostas

> On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Varun,
> 
> Thanks for taking time to look into it. Could you give a sample input and 
> your pattern to reproduce the problem?
> That would help a lot at figuring out the cause of the problem.
> 
> Thanks,
> Kostas
> 
>> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhor...@gmail.com 
>> <mailto:varundhor...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> I was able to reproduce the error with 1.4.0. After upgrading the cluster to 
>> 1.5 snapshot and running through the same data I am still experiencing the 
>> same exception. CEP patterns that I am running are using followed by 
>> patterns e.g AfBfC. From my experience I was never able to get stable 
>> execution when checkpoints are enabled. When I disable checkpoints CEP jobs 
>> are running fine. Aside from this particular error I also notice that 
>> majority of checkpoints expire as the do not complete within configured 5 
>> min timeout period. Any suggestions on further debugging runtime checkpoints 
>> would be very helpful. 
>> Thanks in advance for your assistance.
>> 
>> Regards,
>> Varun 
>> 
>> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>>> Thanks a lot Varun!
>>> 
>>> Kostas
>>> 
>>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com 
>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>> 
>>>> Thank you Kostas. Since this error is not easily reproducible on my end 
>>>> I’ll continue testing this and confirm the resolution once I am able to do 
>>>> so.
>>>> 
>>>> Thanks,
>>>> Varun 
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>>> 
>>>>> Hi Varun,
>>>>> 
>>>>> This can be related to this issue: 
>>>>> https://issues.apache.org/jira/browse/FLINK-8226 
>>>>> <https://issues.apache.org/jira/browse/FLINK-8226>
>>>>> which is currently fixed on the master.
>>>>> 
>>>>> Could you please try the current master to see if the error persists?
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com 
>>>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> Hello Flink community,
>>>>>>>  
>>>>>>> I have encountered following exception while testing 1.4.0 release. 
>>>>>>> This error is occurring intermittently and my CEP job keeps restarting 
>>>>>>> after this exception. I am running the job with Event time semantics 
>>>>>>> and checkpoints enabled.
>>>>>>>  
>>>>>>>  
>>>>>>> java.lang.RuntimeException: Exception occurred while 
>>>>>>> processing valve output watermark:
>>>>>>> at 
>>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>>> at 
>>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>>> at 
>>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>>> at 
>>>>&

Re: Flink CEP exception during RocksDB update

2018-01-24 Thread Kostas Kloudas
Hi Varun,

Thanks for taking time to look into it. Could you give a sample input and your 
pattern to reproduce the problem?
That would help a lot at figuring out the cause of the problem.

Thanks,
Kostas

> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhor...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> I was able to reproduce the error with 1.4.0. After upgrading the cluster to 
> 1.5 snapshot and running through the same data I am still experiencing the 
> same exception. CEP patterns that I am running are using followed by patterns 
> e.g AfBfC. From my experience I was never able to get stable execution when 
> checkpoints are enabled. When I disable checkpoints CEP jobs are running 
> fine. Aside from this particular error I also notice that majority of 
> checkpoints expire as the do not complete within configured 5 min timeout 
> period. Any suggestions on further debugging runtime checkpoints would be 
> very helpful. 
> Thanks in advance for your assistance.
> 
> Regards,
> Varun 
> 
> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> 
>> Thanks a lot Varun!
>> 
>> Kostas
>> 
>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com 
>>> <mailto:varundhor...@gmail.com>> wrote:
>>> 
>>> Thank you Kostas. Since this error is not easily reproducible on my end 
>>> I’ll continue testing this and confirm the resolution once I am able to do 
>>> so.
>>> 
>>> Thanks,
>>> Varun 
>>> 
>>> Sent from my iPhone
>>> 
>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>> 
>>>> Hi Varun,
>>>> 
>>>> This can be related to this issue: 
>>>> https://issues.apache.org/jira/browse/FLINK-8226 
>>>> <https://issues.apache.org/jira/browse/FLINK-8226>
>>>> which is currently fixed on the master.
>>>> 
>>>> Could you please try the current master to see if the error persists?
>>>> 
>>>> Thanks,
>>>> Kostas
>>>> 
>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com 
>>>>> <mailto:varundhor...@gmail.com>> wrote:
>>>>> 
>>>>> 
>>>>> 
>>>>>> Hello Flink community,
>>>>>>  
>>>>>> I have encountered following exception while testing 1.4.0 release. This 
>>>>>> error is occurring intermittently and my CEP job keeps restarting after 
>>>>>> this exception. I am running the job with Event time semantics and 
>>>>>> checkpoints enabled.
>>>>>>  
>>>>>>  
>>>>>> java.lang.RuntimeException: Exception occurred while 
>>>>>> processing valve output watermark:
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>> at 
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>> at 
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.RuntimeException: Error while adding 
>>>>>> data to RocksDB
>>>>>> at 
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>>>>>> at 
>>>&g

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

2018-01-23 Thread Kostas Kloudas
Hi Vishal,

Thanks for checking and glad to hear that your job works after the fix!

As for the equals/hashcode question, if your question is if you have to 
implement exact equals() method and the corresponding hashcode()
then the answer is yes. These methods are used when retrieving and cleaning up 
“outdated” data from FlinkCEP’s internal datastructures.
As a consequence, ambiguous implementations can lead to the wrong elements 
being cleaned up.

Thanks, 
Kostas

> On Jan 21, 2018, at 3:32 PM, Vishal Santoshi  
> wrote:
> 
> Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into 
> my distribution and compiled it into my job jar ). Both the test code and the 
> cluster seems to work ok. Have not tested the "savepoint  and resume" mode 
> but restore from checkpoint works. I brought the JM down and restarted it.  I 
> have to sanitize the output but at least the exception is not thrown.
> 
> One thing though and please confirm
> 
> In CEP it seems that a POJO pushed into the window as part of Pattern match 
> has to have an  "exact" equals/hashcode.  As in in my case I had a custom 
> equals/hashcode for enabling "contains" for a different context as in I had 
> deliberately not included an instance variable in the equals/hashcode  
> contract. Is that a design decision or a requirement in CEP ? 
> 
> Thanks and Regards.
> 
> 
> 
> 
> On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi  > wrote:
> Will do.
> 
> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske  > wrote:
> We don't have a schedule for bugfix releases but do them based on need.
> AFAIK, a discussion about a 1.4.1 release has not been started yet.
> 
> Would you like to kick that off by sending a mail to the dev mailing list?
> 
> 
> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi  >:
> Thanks.  We will.
> 
>When is 1.4.1 scheduled for release ? 
> 
> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz  > wrote:
> Hi Vishal,
> I think it might be due to this bug: 
> https://issues.apache.org/jira/browse/FLINK-8226 
> 
> It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? 
> Would be really helpful. If the error still persists could you file a jira?
> 
> Regards
> Dawid
> 
> > On 11 Jan 2018, at 19:49, Vishal Santoshi  > > wrote:
> >
> > When checkpointing is turned on a simple CEP loop pattern
> >
> >  private Pattern, ?> alertPattern = 
> > Pattern. > SimpleBinaryEvent>>begin("start").where(checkStatusOn)
> > .followedBy("middle").where(checkStatusOn).times(2)
> > .next("end").where(checkStatusOn).within(Time.minutes(5))
> >
> > I see failures.
> >
> > SimpleBinaryEvent is
> >
> > public class SimpleBinaryEvent implements Serializable {
> >
> > private int id;
> > private int sequence;
> > private boolean status;
> > private long time;
> >
> > public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
> >
> > this.id 
> >  = id;
> > this.sequence = sequence;
> > this.status = status;
> > this.time = time;
> > }
> > public int getId() {
> > return id;
> > }
> > public int getSequence() {
> > return sequence;
> > }
> > public boolean isStatus() {
> > return status;
> > }
> > public long getTime() {
> > return time;
> > }
> > @Override
> > public boolean equals(Object o) {
> > if (this == o) return true;
> > if (o == null || getClass() != o.getClass()) return false;
> >
> > SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> >
> > if (getId() != that.getId()) return false;
> > if (isStatus() != that.isStatus()) return false;
> > if (getSequence() != that.getSequence()) return false;
> > return getTime() == that.getTime();
> > }
> >
> > @Override
> > public int hashCode() {
> > //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
> > int result = getId();
> > result = 31 * result + (isStatus() ? 1 : 0);
> > result = 31 * result + getSequence();
> > result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
> > return result;
> > }
> >
> > @Override
> > public String toString() {
> > return "SimpleBinaryEvent{" +
> > "id='" + id + '\'' +
> > ", status=" + status +
> > ", sequence=" + sequence +
> > ", time=" + time +
> > '}';
> > }
> >
> > }
> >
> > failure cause:
> >
> > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> > operator KeyedCEPPatternOperator -> Map (1/1).
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException: 
> > 

Re: Unable to query MapState

2018-01-22 Thread Kostas Kloudas
Hi Velu,

I would recommend to switch to Flink 1.4 as the queryable state has been 
refactored to be compatible with all types of state.
You can read more here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
 

In addition, a lot of things have been simplified.

And for an example you can use this link: 
https://github.com/apache/flink/blob/a3fd548e9c76c67609bbf159d5fb743d756450b1/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L804

which is directly from the Queryable State IT cases.

Thanks,
Kostas

> On Jan 22, 2018, at 2:38 PM, Velu Mitwa  wrote:
> 
> Hi,
> I am trying to query Flink's MapState from Flink client (1.3.2). I was able 
> to query ValueState but when I tried to query MapState I am getting an 
> exception. 
> 
> java.io.IOException: Unconsumed bytes in the deserialized value. This 
> indicates a mismatch in the value serializers used by the KvState instance 
> and this access.
> at 
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
> at 
> com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
> at 
> com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
> at 
> org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
> at 
> org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
> at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
> at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
> 
> Flink Job's Logic
> 
> FlinkKafkaConsumer09 consumer = new 
> FlinkKafkaConsumer09<>(
> "/apps/application-stream:flink-demo", new MerchantApiSchema(), 
> properties);
> 
> DataStream inputEventStream = env.addSource(consumer);
> 
> DataStream> outputStream =
> inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
> .window(SlidingProcessingTimeWindows.of(Time.seconds(120), 
> Time.milliseconds(1000)))
> .sum(2);
> 
> DataStream output = outputStream.keyBy(0).flatMap(new CountEvent());
> 
> output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);
> 
> // execute program
> env.execute("Filter Transformation Example");
> 
>   }
> 
> 
>   public static class CreateTuple
>   implements MapFunction> {
> @Override
> public Tuple3 map(MerchantApiEvent input) throws 
> Exception {
>   return new Tuple3(input.getMerchantId(), 
> input.getApiName(), 1L);
> }
> 
>   }
> 
>   public static class CountEvent extends RichFlatMapFunction String, Long>, Long> {
> 
> private transient MapState mapState;
> 
> @Override
> public void flatMap(Tuple3 input, Collector 
> out) throws Exception {
> 
>   mapState.put(input.f1, input.f2);
> 
> }
> 
> @Override
> public void open(Configuration config) {
> 
>   MapStateDescriptor mapStateDesc = new 
> MapStateDescriptor(
>   "mapQuery", TypeInformation.of(new TypeHint() {
>   }), TypeInformation.of(new TypeHint() {
>   }));
>   mapStateDesc.setQueryable("mapQuery");
> 
>   mapState = getRuntimeContext().getMapState(mapStateDesc);
> 
> }
>   }
> 
> 
> Flink Query Client's Logic
> 
> final JobID jobId = JobID.fromHexString(jobIdParam);
> 
> String key = queryStateRequestDto.getKey();
> 
> final Configuration config = new Configuration();
> config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
> config.setInteger(JobManagerOptions.PORT, jobManagerPort);
> 
> HighAvailabilityServices highAvailabilityServices = null;
> try {
>   highAvailabilityServices = 
> 

Re: Flink CEP exception during RocksDB update

2018-01-18 Thread Kostas Kloudas
Thanks a lot Varun!

Kostas

> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com> wrote:
> 
> Thank you Kostas. Since this error is not easily reproducible on my end I’ll 
> continue testing this and confirm the resolution once I am able to do so.
> 
> Thanks,
> Varun 
> 
> Sent from my iPhone
> 
> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> 
>> Hi Varun,
>> 
>> This can be related to this issue: 
>> https://issues.apache.org/jira/browse/FLINK-8226 
>> <https://issues.apache.org/jira/browse/FLINK-8226>
>> which is currently fixed on the master.
>> 
>> Could you please try the current master to see if the error persists?
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com 
>>> <mailto:varundhor...@gmail.com>> wrote:
>>> 
>>> 
>>> 
>>>> Hello Flink community,
>>>>  
>>>> I have encountered following exception while testing 1.4.0 release. This 
>>>> error is occurring intermittently and my CEP job keeps restarting after 
>>>> this exception. I am running the job with Event time semantics and 
>>>> checkpoints enabled.
>>>>  
>>>>  
>>>> java.lang.RuntimeException: Exception occurred while 
>>>> processing valve output watermark:
>>>> at 
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>> at 
>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>> at 
>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>> at 
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>> at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>> at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>> at 
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error while adding data 
>>>> to RocksDB
>>>> at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>>>> at 
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>>>> at 
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
>>>> at 
>>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>> at 
>>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>> at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>> at 
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>> ... 7 more
>>>> Caused by: java.lang.IllegalStateException: Could not find id 
>>>> for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, 
>>>> timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 
>>>> 2)
>>>> at 
>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>> at 
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
>>>> at 
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
>>>> at 
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
>>>> at 
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
>>>> at 
>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
>>>> ... 13 more
>>>>  
>>>>  
>>>> Thanks,
>>>> Varun
>> 



Re: Flink CEP exception during RocksDB update

2018-01-15 Thread Kostas Kloudas
Hi Varun,

This can be related to this issue: 
https://issues.apache.org/jira/browse/FLINK-8226 

which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

> On Jan 15, 2018, at 4:09 PM, Varun Dhore  wrote:
> 
> 
> 
>> Hello Flink community,
>>  
>> I have encountered following exception while testing 1.4.0 release. This 
>> error is occurring intermittently and my CEP job keeps restarting after this 
>> exception. I am running the job with Event time semantics and checkpoints 
>> enabled.
>>  
>>  
>> java.lang.RuntimeException: Exception occurred while processing 
>> valve output watermark:
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>> at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>> at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error while adding data 
>> to RocksDB
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>> at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
>> at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>> at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>> ... 7 more
>> Caused by: java.lang.IllegalStateException: Could not find id 
>> for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, 
>> timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
>> at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at 
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
>> at 
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
>> at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
>> at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
>> at 
>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
>> ... 13 more
>>  
>>  
>> Thanks,
>> Varun



Re: How to apply patterns from a source onto another datastream?

2017-12-28 Thread Kostas Kloudas
Hi Jayant, 

As Dawid said, currently dynamically updating patterns is currently not 
supported.
There is also this question raised in the dev mailing list with 
the subject CEP: Dynamic Patterns.

I will repeat my answer here so that we are on the same page: 

"To support this, we need 2 features with one having to be added in Flink 
itself,
and the other to the CEP library.

The first one is broadcast state and the ability to connect keyed and non-keyed 
streams. This one is to be added to Flink itself and the good news are that 
this 
feature is scheduled to be added to Flink 1.5.

The second feature is to modify the CEP operator so that it can support 
multiple 
patterns and match incoming events against all of them. For this I have no 
clear 
deadline in my mind, but given that there are more and more people asking for 
it, I think it is going to be added soon."


Now for implementing CEP on top of Flink’s windowing mechanism, I would not
consider it as straight-forward as it sounds. Conceptually, CEP forms windows 
and within these windows you search for matching patterns. But CEP’s windowing 
semantics differ drastically from Flink’s windowing. Windows in CEP are created
whenever an event that matches the first element in a pattern comes. In Flink,
a window is created and is considered complete either based on time (tumbling/
sliding) or when a specific time interval expires without any activity in the 
stream (session).
So in one case (CEP), "window" boundaries are defined based on event properties 
while in the 
other (Flink Windowing), they are specified based on time. In addition, in CEP 
the order 
in which elements are consumed matters, as the pattern is essentially a state 
machine. 
In Flink, elements are added to a window in the order that they arrive.

In any case, I would consider an effort to re-implement CEP on top of Flink’s 
windowing
far from trivial. If your use case is simple and fits into Flink’s windowing 
semantics, then
go ahead. But if not, I would recommend waiting a bit more for the feature to 
be 
supported by the library.

Regards, 
Kostas


> On Dec 25, 2017, at 5:53 AM, Jayant Ameta  wrote:
> 
> Hi Dawid,
> Since dynamic patterns are not available in Flink CEP, I am thinking about 
> skipping the CEP altogether, and mimic the functionality using windows 
> stream. 
> I am mostly interested in times and within methods. Basically, rewriting my 
> own logic on windowed stream to match the pattern, and count the number of 
> matching events within a time window.
> Do you know if there is any similar example in the docs?
> 
> Jayant Ameta
> 
> On Fri, Dec 22, 2017 at 1:24 PM, Dawid Wysakowicz  > wrote:
> Hi Jayant,
> 
> Could you elaborate a bit more what you mean? Flink’s windows are not used in 
> Flink CEP. They are a different concept.
> 
> > On 20 Dec 2017, at 09:23, Jayant Ameta  > > wrote:
> >
> > Would it be possible to get the same result using windows?
> >
> > Jayant Ameta
> >
> > On Tue, Dec 19, 2017 at 3:23 PM, Dawid Wysakowicz 
> > > wrote:
> > It is not possible at this moment. FlinkCEP can handle only one Pattern 
> > applied statically. There is a JIRA ticket for that: 
> > https://issues.apache.org/jira/browse/FLINK-7129 
> >  .
> >
> > > On 19 Dec 2017, at 10:10, Jayant Ameta  > > > wrote:
> > >
> > > I've a datastream of events, and another datastream of patterns. The 
> > > patterns are provided by users at runtime, and they need to come via a 
> > > Kafka topic. I need to apply each of the pattern on the event stream 
> > > using Flink-CEP. Is there a way to get a PatternStream from the 
> > > DataStream when I don't know the pattern beforehand?
> > >
> > > https://stackoverflow.com/questions/47883408/apache-flink-how-to-apply-patterns-from-a-source-onto-another-datastream
> > >  
> > > 
> >
> >
> 
> 



Re: Triggers in Flink CEP

2017-12-19 Thread Kostas Kloudas
Hi Shailesh,

The pattern operator does not use Flink’s windowing mechanism internally.
Conceptually you may think that there are windows in both, and this is true, 
but there 
are significant differences that prevent using Flink windowing for CEP.

The above implies also that using triggers for early firing is not supported 
and is far from 
trivial to implement.

Thanks,
Kostas

> On Dec 19, 2017, at 5:27 PM, Shailesh Jain  
> wrote:
> 
> Hi,
> 
> Similar to the way it is exposed in Windows operator, is it possible to use 
> Triggers inside the Pattern Operator to fire partially matched patterns (when 
> certain events are very late and we want some level of controlled early 
> evaluation)?
> 
> I assume that Windows are used internally to implement the CEP library, so 
> wanted to know how much of a work will it be to extend the library to expose 
> this functionality.
> 
> Thanks,
> Shailesh



Re: Hardware Reference Architecture

2017-12-11 Thread Kostas Kloudas
Hi Hayden,

This is a talk from Flink Forward that may be of help to you:
https://www.youtube.com/watch?v=8l8dCKMMWkw 
<https://www.youtube.com/watch?v=8l8dCKMMWkw>

and here are the slides:
www.slideshare.net/FlinkForward/flink-forward-berlin-2017-robert-metzger-keep-it-going-how-to-reliably-and-efficiently-operate-apache-flink/3
 
<http://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-robert-metzger-keep-it-going-how-to-reliably-and-efficiently-operate-apache-flink/3>

Kostas

> On Dec 7, 2017, at 6:36 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Hayden,
> 
> It would be nice if you could share a bit more details about your use case 
> and the load that you expect to have,
> as this could allow us to have a better view of your needs.
> 
> As a general set of rules:
> 1) I would say that the bigger your cluster (in terms of resources, not 
> necessarily machines) the better.
> 2) the more the RAM per machine the better, as this will allow to fit more 
> things in memory without spilling to disk
> 3) in the dilemma between few powerful machines vs a lot of small ones, I 
> would go more towards the first, as this 
> allows for smaller network delays.
> 
> Once again, the above rules are just general recommendations and more details 
> about your workload will give us 
> more information to work with.
> 
> In the documentation here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#background--internals
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#background--internals>
> you can find some details about deployment, monitoring, etc.
> 
> I hope this helps,
> Kostas
> 
>> On Dec 7, 2017, at 1:53 PM, Marchant, Hayden <hayden.march...@citi.com 
>> <mailto:hayden.march...@citi.com>> wrote:
>> 
>> Hi,
>> 
>> I'm looking for guidelines for Reference architecture for Hardware for a 
>> small/medium Flink cluster - we'll be installing on in-house bare-metal 
>> servers. I'm looking for guidance for:
>> 
>> 1. Number and spec of  CPUs
>> 2. RAM
>> 3. Disks
>> 4. Network
>> 5. Proximity of servers to each other
>> 
>> (Most likely, we will choose YARN as a cluster manager for Flink)
>> 
>> If someone can share a document or link with relevant information, I will be 
>> very grateful.
>> 
>> Thanks,
>> Hayden Marchant
>> 
> 



Re: Hardware Reference Architecture

2017-12-07 Thread Kostas Kloudas
Hi Hayden,

It would be nice if you could share a bit more details about your use case and 
the load that you expect to have,
as this could allow us to have a better view of your needs.

As a general set of rules:
1) I would say that the bigger your cluster (in terms of resources, not 
necessarily machines) the better.
2) the more the RAM per machine the better, as this will allow to fit more 
things in memory without spilling to disk
3) in the dilemma between few powerful machines vs a lot of small ones, I would 
go more towards the first, as this 
allows for smaller network delays.

Once again, the above rules are just general recommendations and more details 
about your workload will give us 
more information to work with.

In the documentation here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#background--internals
 

you can find some details about deployment, monitoring, etc.

I hope this helps,
Kostas

> On Dec 7, 2017, at 1:53 PM, Marchant, Hayden  wrote:
> 
> Hi,
> 
> I'm looking for guidelines for Reference architecture for Hardware for a 
> small/medium Flink cluster - we'll be installing on in-house bare-metal 
> servers. I'm looking for guidance for:
> 
> 1. Number and spec of  CPUs
> 2. RAM
> 3. Disks
> 4. Network
> 5. Proximity of servers to each other
> 
> (Most likely, we will choose YARN as a cluster manager for Flink)
> 
> If someone can share a document or link with relevant information, I will be 
> very grateful.
> 
> Thanks,
> Hayden Marchant
> 



Re: Testing CoFlatMap correctness

2017-12-07 Thread Kostas Kloudas
Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to 
do something like:

TwoInputStreamOperatorTestHarness testHarness =
  new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against 
the expected one.

If you have access to the Flink source code, I would recommend you to have a 
look at the CoProcessOperatorTest for an example.

Or you can find it here: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
 


Hope this helps,
Kostas


> On Dec 7, 2017, at 5:54 PM, Sofer, Tovi  wrote:
> 
> Hi group,
> 
>  
> 
> What is the best practice for testing CoFlatMap operator correctness?
> 
> We have two source functions, each emits data to stream, and a connect 
> between them, and I want to make sure that when streamA element arrive before 
> stream element, a certain behavior happens.
> 
> How can I test this case?
> 
> Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> and emitting timestamp and watermark per element didn’t help, and still each 
> element arrive in unexpected order.
>  
> Thanks in advance,
> Tovi



Re: Maintain heavy hitters in Flink application

2017-12-07 Thread Kostas Kloudas
Hi Max,

You are right that Queryable State is not designed to be used as a means for a 
job to query its own state.
In fact, given that you do not know the jobId of your job from within the job 
itself, I do not think you can use 
queryable state in your scenario.

What you can do is to have a flatMap computing the hot keys or heavy hitters, 
and emit as main output the 
elements themselves for further processing, and as a side output the computed 
statistics. The side output 
is a data stream itself so you can store it in an external storage system (e.g. 
a KV store) and use AsyncIO to 
query that system downstream. This will solve the problem of having access to 
the state from all tasks. 

This is a simple solution but I am not sure about the performance implications. 
You can try it to see if it actually fits your needs.

Thanks, 
Kostas


> On Dec 5, 2017, at 10:32 AM, Fabian Hueske  wrote:
> 
> Hi,
> 
> I haven't done that before either. The query API will change with the next 
> version (Flink 1.4.0) which is currently being prepared for releasing.
> Kostas (in CC) might be able to help you.
> 
> Best, Fabian
> 
> 2017-12-05 9:52 GMT+01:00 m@xi  >:
> Hi Fabian,
> 
> Thanks for your answer. Initially, I have excluded Queryable State as an
> option as it explicitly mentioned that it is used for querying state outside
> flink.
> 
> Now that I am reading the documentation I am not sure how I may achieve
> that. I have to set ports and addresses which I am not sure I should since I
> am reading the queryable state from inside the same job.
> 
> Can you or someone elaborate further how can I read the queryable state of a
> specific task from another task (e.g. map).
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: Are there plans to support Hadoop 2.9.0 on near future?

2017-11-29 Thread Kostas Kloudas
Hi Oriol,

This estimation is not accurate and the whole plan is a bit outdated.
This was based on an outdated time-based release model that the community tried 
but without the expected results,
so we changed it.

You can follow the release voting for 1.4 in the dev mailing list. And the 
archived discussion is here:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-4-0-release-candidate-2-td20291.html
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-4-0-release-candidate-2-td20291.html>

Cheers,
Kostas

> On Nov 29, 2017, at 11:27 AM, ORIOL LOPEZ SANCHEZ 
> <oriol.lopezsanc...@telefonica.com> wrote:
> 
> Thanks, it helped a lot!
> 
> But I've seen on 
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan#FlinkReleaseandFeaturePlan-Flink1.4
>  
> <https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan#FlinkReleaseandFeaturePlan-Flink1.4>
>  that they estimated releasing 1.4 at September. Do you know if it will be 
> released this year or we may have to wait longer?
> 
> Thanks a lot.
> 
> De: Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>>
> Enviat el: dimecres, 29 de novembre de 2017 11:15:16
> Per a: ORIOL LOPEZ SANCHEZ
> A/c: user@flink.apache.org <mailto:user@flink.apache.org>
> Tema: Re: Are there plans to support Hadoop 2.9.0 on near future?
>  
> Hi Oriol,
> 
> As you may have seen form the mailing list we are currently in the process of 
> releasing Flink 1.4. This is going 
> to be a hadoop-free distribution which means that it should work with any 
> hadoop version, including Hadoop 2.9.0.
> 
> Given this, I would recommend to try out the release candidate (which will 
> hopefully be the next official Flink release) 
> and it should work just fine!
> 
> Hope this helps,
> Kostas
> 
>> On Nov 29, 2017, at 10:37 AM, ORIOL LOPEZ SANCHEZ 
>> <oriol.lopezsanc...@telefonica.com 
>> <mailto:oriol.lopezsanc...@telefonica.com>> wrote:
>> 
>> Hi, I'm currently working on designing a data-processing cluster, and one of 
>> the distributed processing tools we want to use is Flink.
>> 
>> As we're creating our cluster from barebones, without relying on any Hadoop 
>> distributions such as Hortonworks or Cloudera, we want to use Flink with 
>> Hadoop 2.9.0, but it's not officially supported.
>> 
>> May I know if there are plans to support Hadoop 2.9.0 on a near future?
>> 
>> Thank you very much,
>> 
>> Oriol López Sánchez.
>> 
>> 
>> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
>> puede contener información privilegiada o confidencial y es para uso 
>> exclusivo de la persona o entidad de destino. Si no es usted. el 
>> destinatario indicado, queda notificado de que la lectura, utilización, 
>> divulgación y/o copia sin autorización puede estar prohibida en virtud de la 
>> legislación vigente. Si ha recibido este mensaje por error, le rogamos que 
>> nos lo comunique inmediatamente por esta misma vía y proceda a su 
>> destrucción.
>> 
>> The information contained in this transmission is privileged and 
>> confidential information intended only for the use of the individual or 
>> entity named above. If the reader of this message is not the intended 
>> recipient, you are hereby notified that any dissemination, distribution or 
>> copying of this communication is strictly prohibited. If you have received 
>> this transmission in error, do not read it. Please immediately reply to the 
>> sender that you have received this communication in error and then delete it.
>> 
>> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, 
>> pode conter informação privilegiada ou confidencial e é para uso exclusivo 
>> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário 
>> indicado, fica notificado de que a leitura, utilização, divulgação e/ou 
>> cópia sem autorização pode estar proibida em virtude da legislação vigente. 
>> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique 
>> imediatamente por esta mesma via e proceda a sua destruição
> 
> 
> 
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
> puede contener información privilegiada o confidencial y es para uso 
> exclusivo de la persona o entidad de destino. Si no es usted. el destinatario 
> indicado, queda notificado de que la lectura, utilización, divulgación y/o 
> copia sin autorización puede estar prohibida en virtud de la legislación 
> vigente. Si ha r

Re: Are there plans to support Hadoop 2.9.0 on near future?

2017-11-29 Thread Kostas Kloudas
Hi Oriol,

As you may have seen form the mailing list we are currently in the process of 
releasing Flink 1.4. This is going 
to be a hadoop-free distribution which means that it should work with any 
hadoop version, including Hadoop 2.9.0.

Given this, I would recommend to try out the release candidate (which will 
hopefully be the next official Flink release) 
and it should work just fine!

Hope this helps,
Kostas

> On Nov 29, 2017, at 10:37 AM, ORIOL LOPEZ SANCHEZ 
>  wrote:
> 
> Hi, I'm currently working on designing a data-processing cluster, and one of 
> the distributed processing tools we want to use is Flink.
> 
> As we're creating our cluster from barebones, without relying on any Hadoop 
> distributions such as Hortonworks or Cloudera, we want to use Flink with 
> Hadoop 2.9.0, but it's not officially supported.
> 
> May I know if there are plans to support Hadoop 2.9.0 on a near future?
> 
> Thank you very much,
> 
> Oriol López Sánchez.
> 
> 
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, 
> puede contener información privilegiada o confidencial y es para uso 
> exclusivo de la persona o entidad de destino. Si no es usted. el destinatario 
> indicado, queda notificado de que la lectura, utilización, divulgación y/o 
> copia sin autorización puede estar prohibida en virtud de la legislación 
> vigente. Si ha recibido este mensaje por error, le rogamos que nos lo 
> comunique inmediatamente por esta misma vía y proceda a su destrucción.
> 
> The information contained in this transmission is privileged and confidential 
> information intended only for the use of the individual or entity named 
> above. If the reader of this message is not the intended recipient, you are 
> hereby notified that any dissemination, distribution or copying of this 
> communication is strictly prohibited. If you have received this transmission 
> in error, do not read it. Please immediately reply to the sender that you 
> have received this communication in error and then delete it.
> 
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, 
> pode conter informação privilegiada ou confidencial e é para uso exclusivo da 
> pessoa ou entidade de destino. Se não é vossa senhoria o destinatário 
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia 
> sem autorização pode estar proibida em virtude da legislação vigente. Se 
> recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente 
> por esta mesma via e proceda a sua destruição



Re: Bad entry in block exception with RocksDB

2017-11-23 Thread Kostas Kloudas
Hi Kien,

Could you share some more information about your job?
What operators are you using, the format of your elements, etc?

Thanks,
Kostas

> On Nov 23, 2017, at 2:23 AM, Kien Truong  wrote:
> 
> Hi,
> 
> We are seeing this exception in one of our job, whenever a check point or 
> save point is performed.
> 
> java.lang.RuntimeException: Error while adding data to RocksDB
> at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.add(RocksDBListState.java:119)
> at 
> org.apache.flink.runtime.state.UserFacingListState.add(UserFacingListState.java:50)
> 
> Caused by: org.rocksdb.RocksDBException: bad entry in block
> at org.rocksdb.RocksDB.merge(Native Method)
> at org.rocksdb.RocksDB.merge(RocksDB.java:683)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.add(RocksDBListState.java:116)
> 
> We are using Flink 1.3.2 with incremental checkpoint. Any idea what might be 
> causing this ?
> 
> Best regards,
> Kien



Re: readFile, DataStream

2017-11-13 Thread Kostas Kloudas
Hi Juan,

The problem is that once a file for a certain timestamp is processed and the 
global modification timestamp is modified, 
then all files for that timestamp are considered processed.

The solution is not to remove the = from the modificationTime <= 
globalModificationTime; in ContinuousFileMonitoringFunction, as this 
would lead to duplicates. 
The solution, in my opinion is to keep a list of the filenames (or hashes) of 
the files processed for the last globalModTimestamp (and only for that 
timestamp) and when there are new with the same timestamp, then check if the 
name of the file they belong is in that list. 

This way you pay a bit of memory but you get what you want.

What do you think?

Thanks,
Kostas

> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela  wrote:
> 
> Hi there,
> 
> I’m trying to watch a directory for new incoming files (with 
> StreamExecutionEnvironment#readFile) with a subsecond latency (interval watch 
> of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY).
> 
> If many files come in within (under) the interval watching time, flink 
> doesn’t seem to get notice of the files, and as a result, the files do not 
> get processed. The behavior also seems undeterministic, as it likely depends 
> on timeouts and so on. For example, 10 new files come in immediately (that 
> is, essentially in parallel) and perhaps 3 files get processed, but the rest 
> 7 don’t.
> 
> I’ve extended and created my own FileInputFormat, for which I don’t do much 
> more than in the open function, log when a new file comes in. That’s how I 
> know that many fails get lost.
> 
> On the other hand, when I restart flink, all the files in the directory are 
> immediately processed. This is the expected behavior and works fine.
> 
> The situation of unprocessed files is a bummer.
> 
> Am I doing something wrong? Do I need to set something in the configuration? 
> Is it a bug in Flink?
> 
> Hopefully I described my problem clearly.
> 
> Thank you.
> 



Re: Queryable State Python

2017-11-10 Thread Kostas Kloudas
Hi Martin,

I will try to reply to your questions inline:

> On Nov 10, 2017, at 1:59 PM, Martin Eden  wrote:
> 
> Hi,
> 
> Our team is looking at replacing Redis with Flink's own queryable state 
> mechanism. However our clients are using python.
> 
> 1. Is there a python integration with the Flink queryable state mechanism?
> Cannot seem to be able to find one.

There is no Python API for queryable state. Currently only Java is supported.

> 2. If not, is it on the roadmap?

I am not aware of any efforts towards that direction, although there are 
discussions about porting queryable state to REST, so 
that more clients (in any language) can be written.
> 
> 3. Our current solution is to write a Java RPC proxy and query that from 
> python. The whole point of using queryable state was to get rid of an extra 
> service (Redis) but now it seems we need to add another one. Is there an 
> easier way to call queryable state from Python without requiring an extra 
> service?

Unfortunately for now I am not aware of any easier way to do so.

> 
> 4. Is queryable state used in production by anyone? Can anyone share numbers, 
> experiences, case studies?

Queryable state is currently under heavy development. So APIs may change and 
features may be added. 
For queryable state users, I would recommend checking out the talks in previous 
Flink Forward editions. They are all on-line. 

> 
> 5. What is the direction that queryable state is going in for the next Flink 
> release? Features, api?

The next release is going to come soon (it is currently under testing), so you 
can already have a look on how Queryable State
is going to look like if you check out the current release-1.4 branch of Flink 
on GitHub.

> 
> 6. Is the Flink queryable state going to be supported/developed going forward 
> with the advent of Pravega which has caching like capabilities as well?

For this I am cc’ing Stephan. Probably he is more informed.

Hope this helps,
Kostas

> Thanks,
> M



Re: When using Flink for CEP, can the data in Cassandra database be used for state

2017-11-09 Thread Kostas Kloudas
Hi Shyla,

Happy to hear that you are experimenting with CEP!

For enriching your input stream with data from Cassandra (or whichever external 
storage system) you could use:
* either the AsyncIO functionality offered by Flink 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
 
)
* or, iff all your database fits in memory, you could write a ProcessFunction 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html
 
)
 which loads the database in memory in the open() method, and then uses the 
data accordingly.

Afterwards, you can use the resulting (enriched) DataStream to feed it into CEP 
for further processing.

Hope this helps!
Kostas

> On Nov 9, 2017, at 12:08 AM, shyla deshpande  wrote:
> 
> Hello all,
> 
> I am new to Flink.
> 
> We have our data in Cassandra database. We have a use case for CEP. 
> I am checking out if Flink fits well for us.  
> 
> When processing the event data, I may want to pull data for the cassandra 
> database like the user profile and join with the event data.
> 
> Is there a way to do that?  I appreciate your help. 
> 
> Thanks



Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Perfect! thanks a lot!

Kostas

> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio 
> <federico.dambro...@smartlab.ws> wrote:
> 
> Hi Kostas, 
> 
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
> 
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>>:
> Hi Federico,
> 
> I assume that you are using Flink 1.3, right?
> 
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756 
> <https://issues.apache.org/jira/browse/FLINK-7756>
> 
> Could you try the current master to see if it fixes your problem?
> 
> Thanks,
> Kostas
> 
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>> <federico.dambro...@smartlab.ws <mailto:federico.dambro...@smartlab.ws>> 
>> wrote:
>> 
>>  Could not find id for entry:
>> 
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio



Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:
https://issues.apache.org/jira/browse/FLINK-7756 


Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>  wrote:
> 
>  Could not find id for entry: 
>



Re: FlinkCEP: pattern application on a KeyedStream

2017-10-19 Thread Kostas Kloudas
Hi Federico,

If I understand your question correctly, then yes, the application of a Pattern 
on a keyed stream 
is similar to the application of a map function.

It will search for the pattern on each per-key stream of data.
So there will be state (buffer with partial matches, queued elements, etc) for 
every active key.

Cheers,
Kostas

> On Oct 19, 2017, at 11:55 AM, Federico D'Ambrosio 
>  wrote:
> 
> Hi all,
> 
> I was wondering if it is correct to assume the application of a pattern on a 
> KeyedStream similar to the application, e.g., of a MapFunction when it comes 
> to state.
> 
> For example, the following
> 
> val pattern = ...
> val keyedStream = stream.keyBy("id")
> 
> val patternKeyedStream = CEP.pattern(pattern, keyedStream)
> 
> val anotherKeyedStream = patternKeyedStream.select(...)
> 
> should only check the pattern on each single partition value.
> 
> Am I correct in assuming this, or I have misunderstood CEP functioning?
> 
> -- 
> Federico D'Ambrosio



Re: async io operator timeouts

2017-10-10 Thread Kostas Kloudas
Perfect! Thanks a lot Karthik.

> On Oct 10, 2017, at 10:41 AM, Karthik Deivasigamani <karthi...@gmail.com> 
> wrote:
> 
> Thanks Kostas. 
> Here is the JIRA : https://issues.apache.org/jira/browse/FLINK-7789 
> <https://issues.apache.org/jira/browse/FLINK-7789>
> 
> ~
> Karthik
> 
> On Mon, Oct 9, 2017 at 7:12 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Karthik,
> 
> Currently there is no way to provide a handler for timed-out requests.
> So the behavior is exactly what you described. A request fails, an exception 
> is thrown and
> the job is restarted.
> 
> A handler would be a nice addition. If you want, you can open a JIRA about it 
> and if would like
> to work on it, feel free to submit a PR.
> 
> Thanks, 
> Kostas
>  
>> On Oct 6, 2017, at 4:57 PM, Karthik Deivasigamani <karthi...@gmail.com 
>> <mailto:karthi...@gmail.com>> wrote:
>> 
>> Hi,
>>Is there a way to catch the timeouts thrown from async io operator?
>> We use async io API to make some high latency HTTP API calls. Currently when 
>> the underlying http connection hangs and fails to timeout in the configured 
>> time the async timeout kicks in and throws an exception which causes the job 
>> to restart. Is there a way to catch this exception in application code? We 
>> are apache flink 1.3.1
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html>
>> ~
>> Karthik
> 
> 



Re: serialization error when using multiple metrics counters

2017-10-09 Thread Kostas Kloudas
Hi Colin,

Are you initializing your counters from within the open() method of you rich 
function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

from within the open().

The counter interface is not serializable. So if you instantiate the counters 
outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the 
exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
 


Thanks,
Kostas

> On Oct 7, 2017, at 11:34 PM, Colin Williams 
>  wrote:
> 
> I've created a RichMapFunction in scala with multiple counters like:
> 
>lazy val successCounter = 
> getRuntimeContext.getMetricGroup.counter("successfulParse")
>lazy val failedCounter = 
> getRuntimeContext.getMetricGroup.counter("failedParse")
>lazy val errorCounter = 
> getRuntimeContext.getMetricGroup.counter("errorParse")
> 
> which I increment in the map function. While testing I noticed that I have no 
> issues with using a single counter. However with multiple counters I get a 
> serialization error using more than one counter.
> 
> Does anyone know how I can use multiple counters from my RichMapFunction, or 
> what I'm doing wrong?
> 
> [info]   org.apache.flink.api.common.InvalidProgramException: The 
> implementation of the RichMapFunction is not serializable. The object 
> probably contains or references non serializable fields.
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io .NotSerializableException: 
> org.apache.flink.metrics.SimpleCounter
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a 
> Error -> ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The 
> implementation of the RichMapFunction is not serializable. The object 
> probably contains or references non serializable fields.
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io .NotSerializableException: 
> org.apache.flink.metrics.SimpleCounter
> [info]   at 
> 

Re: Bucketing/Rolling Sink: How to overwrite method "openNewPartFile" - to append a new timestamp to part file path every time a new part file is being created

2017-10-09 Thread Kostas Kloudas
Hi Raja,

To know about the method, I suppose you have looked at the source code of the 
sink.
I think that including the timestamp of the element in the path file is not as 
easy as overriding the openNewPartFile.

The reason is that the filenames serve as identities for the associated state 
of the bucket and this searches for 
complete equality of the filename, rather that “contains()”, when checking for 
partial filenames to transition from
pending to finished state.

A way to bypass this, it to write along each element, its timestamp, so that 
when you check out the content of the 
file, you can see the timestamp of the first element. You will have to write 
more data though.

Does this fit your needs?

Kostas

> On Oct 6, 2017, at 11:02 PM, Raja.Aravapalli  
> wrote:
> 
>  
> Hi,
>  
> I want to overwrite the method “openNewPartFile” in the BucketingSink Class 
> such that it creates part file name with inclusion of timestamp whenever it 
> rolls a new part file.
>  
> Can someone share some thoughts on how I can do this. 
>  
> Thanks a ton, in advance. 
>  
>  
> Regards,
> Raja.



Re: Windowed Stream Queryable State Support

2017-10-08 Thread Kostas Kloudas
Hi Vijay,

If by “Windowed Stream Queryable State Support” you mean when will Flink allow 
to query 
the state of an in-flight window, then a version will be available in 1.4 yes.

Cheers,
Kostas

> On Oct 7, 2017, at 2:55 PM, vijayakumar palaniappan  
> wrote:
> 
> What is the state of Windowed Stream Queryable State Support?
> 
> Is it available in 1.3 or planned for 1.4?
> 
> Thanks
> Vijay
> 



Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay,

After reading all the data from your source, could you somehow tell your 
sources to send 
a watermark of Long.MaxValue (or a high value)??

I am asking this, just to see if the problem is that the data is simply 
buffered inside Flink because
there is a problem with the timestamps and the watermarks.
You could also see this from the WebUi, but seeing the size of your 
checkpointed state.
If the size increases, it means that something is stored there.

I will also have a deeper look.

Kostas

> On Sep 28, 2017, at 5:17 PM, Ajay Krishna <ajaykris...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Thank you for reaching out and for the suggestions. Here are the results
> 
> 1. Using an env parallelism of 1 performed similar with the additional 
> problem that there was significant lag in the kafka topic
> 2. I removed the additional keyBy(0) but that did not change anything
> 3. I also tried only to check for the start only pattern and it was exactly 
> the same where I saw one of the homes going through but 3 others just getting 
> dropped. 
> 4. I also tried slowing down the rate from 5000/second into Kafka to about 
> 1000/second but I see similar results. 
> 
> I was wondering if you had any other solutions to the problem. I am specially 
> concerned about 1 and 3. Is this library under active development ? Is there 
> a JIRA open on this issue and could be open one to track this ? 
> 
> 
> I was trying read on Stackoverlfow and found a user had a very very similar 
> issue in Aug'16. So I also contacted him to discuss the issue and learn't 
> that the pattern of failure was exactly the same. 
> 
> https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic 
> <https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic>
> 
> 
> Before I found the above post, I created a post for this issue
> https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern
>  
> <https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern>
> 
> 
> 
> I would really appreciate your guidance on this. 
> 
> Best regards,
> Ajay
> 
> 
> 
> 
> 
> On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Ajay,
> 
> I will look a bit more on the issue.
> 
> But in the meantime, could you run your job with parallelism of 1, to see if 
> the results are the expected?
> 
> Also could you change the pattern, for example check only for the start, to 
> see if all keys pass through.
> 
> As for the code, you apply keyBy(0) the cepMap stream twice, which is 
> redundant and introduces latency. 
> You could remove that to also see the impact.
> 
> Kostas
> 
>> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykris...@gmail.com 
>> <mailto:ajaykris...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> I've been only working with flink for the past 2 weeks on a project and am 
>> trying using the CEP library on sensor data. I am using flink version 1.3.2. 
>> Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 
>> node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink 
>> dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>> 
>> What I observe is the following. The input to Kafka is a json string and 
>> when parsed on the flink side, it looks like this
>> 
>> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
>> event,High,37.75142,-122.39458,12.0,20.0)
>> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
>> time characteristic is set to EventTime and I have an 
>> AscendingTimestampExtractor using the timestamp field. I have parallelism 
>> for the execution environment is set to 4. I have a rather simple event that 
>> I am trying to capture
>> 
>> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> 
>> cepMapByHomeId = cepMap.keyBy(0);
>> 
>> //cepMapByHomeId.print();
>> 
>> 
>> Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>> 
>> Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>> .where(new OverLowThreshold())
>> .followedBy("end")
>> .where(new OverHighThreshold());
>> 
>> 
>> PatternStream<Tuple8<Integer, Date, String, String, Float, 
>> Float, Float, 

Re: Issue with CEP library

2017-09-28 Thread Kostas Kloudas
Hi Ajay,

I will look a bit more on the issue.

But in the meantime, could you run your job with parallelism of 1, to see if 
the results are the expected?

Also could you change the pattern, for example check only for the start, to see 
if all keys pass through.

As for the code, you apply keyBy(0) the cepMap stream twice, which is redundant 
and introduces latency. 
You could remove that to also see the impact.

Kostas

> On Sep 28, 2017, at 2:57 AM, Ajay Krishna  wrote:
> 
> Hi, 
> 
> I've been only working with flink for the past 2 weeks on a project and am 
> trying using the CEP library on sensor data. I am using flink version 1.3.2. 
> Flink has a kafka source. I am using KafkaSource9. I am running Flink on a 3 
> node AWS cluster with 8G of RAM running Ubuntu 16.04. From the flink 
> dashboard, I see that I have 2 Taskmanagers & 4 Task slots
> 
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> 
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> 
> DataStream> 
> cepMapByHomeId = cepMap.keyBy(0);
> 
> //cepMapByHomeId.print();
> 
> 
> Pattern, ?> cep1 =
> 
> Pattern.>begin("start")
> .where(new OverLowThreshold())
> .followedBy("end")
> .where(new OverHighThreshold());
> 
> 
> PatternStream Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
> 
> 
> DataStream Float>> alerts = patternStream.select(new PackageCapturedEvents());
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> 
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> 
> THE PROBLEM:
> 
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink which is cassandra in this case. 
> I've looked through all available docs and examples but cannot seem to get a 
> fix for the problem.
> 
> I would really appreciate some guidance how to understand fix this.
> 
> 
> 
> Thank you,
> 
> Ajay
> 



Re: CustomPartitioner that simulates ForwardPartitioner and watermarks

2017-09-28 Thread Kostas Kloudas
Hi Yunus,

I see. Currently I am not sure that you can simply broadcast the watermark 
only, without 
having a shuffle.

But one thing to notice about your algorithm is that, I am not sure if your 
algorithm solves 
the problem you encounter.

Your algorithm seems to prioritize the stream with the elements with the 
smallest timestamps,
rather than throttling fast streams so that slow ones can catch up.

Example: Reading a partition from Kafka that has elements with timestamps 1,2,3
will emit watermark 3 (assuming ascending watermark extractor), while another 
task that reads 
another partition with elements with timestamps 5,6,7 will emit watermark 7. 
With your algorithm, 
if I get it right, you will throttle the second partition/task, while allow the 
first one to advance, although
both read at the same pace (e.g. 3 elements per unit of time).

I will think a bit more on the solution. 

Some sketches that I can find, they all introduce some latency, e.g. measuring 
throughput in taskA
and sending it to a side output with a taksID, then broadcasting the side 
output to a downstream operator
which is sth like a coprocess function (taskB) and receives the original stream 
and the side output, and 
this is the one that checks if “my task" is slow. 

As I said I will think on it a bit more,
Kostas

> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunol...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> Yes, you have summarized well. I want to only forward the data to the next 
> local operator, but broadcast the watermark through the cluster.
> 
> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
> Also, the data is ordered at each partition. I don’t want to change that 
> order.
> 
> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
> parallelism with each other. But this parallelism can be increased in the 
> future.
> 
> The use case is: The source is Kafka. At our peak hours or when we want to 
> run the streaming job with old data from Kafka, always the same thing 
> happens. Even at trivial jobs. Some consumers consumes faster than others. 
> They produce too much data to downstream but watermark advances slowly at the 
> speed of the slowest consumer. This extra data gets piled up at downstream 
> operators. When the downstream operator is an aggregation, it is ok. But when 
> it is a in-Flink join; state size gets too big, checkpoints take much longer 
> and overall the job becomes slower or fails. Also it effects other jobs at 
> the cluster.
> 
> So, basically I want to implement a throttler. It compares timestamp of a 
> record and the global watermark. If the difference is larger than a constant 
> threshold it starts sleeping 1 ms for each incoming record. This way, fast 
> operators wait for the slowest one.
> 
> The only problem is that, this solution came at the cost of one network 
> shuffle and data serialization/deserialization. Since the stream is large I 
> want to avoid the network shuffle at the least. 
> 
> I thought operator instances within a taskmanager would get the same indexId, 
> but apparently this is not the case.
> 
> Thanks,
> 
>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Yunus,
>> 
>> I am not sure if I understand correctly the question.
>> 
>> Am I correct to assume that you want the following?
>> 
>>  ———> time
>> 
>>  ProcessAProcessB
>> 
>> Task1: W(3) E(1) E(2) E(5)   W(3) W(7) E(1) E(2) E(5)
>> 
>> Task2: W(7) E(3) E(10) E(6)  W(3) W(7) E(3) E(10) E(6)
>> 
>> 
>> In the above, elements flow from left to right and W() stands for watermark 
>> and E() stands for element.
>> In other words, between Process(TaksA) and Process(TaskB) you want to only 
>> forward the elements, but broadcast the watermarks, right?
>> 
>> If this is the case, a trivial solution would be to set the parallelism of 
>> TaskB to 1, so that all elements go through the same node.
>> 
>> One other solution is what you did, BUT by using a custom partitioner you 
>> cannot use keyed state in your process function B because the 
>> stream is no longer keyed.
>> 
>> A similar approach to what you did but without the limitation above, is that 
>> in the first processFunction (TaskA) you can append the 
>> taskId to the elements themselves and then do a keyBy(taskId) between the 
>> first and the second process function.
>> 
>> These are the solutions that I can come up with, assuming that you 

Re: StreamCorruptedException

2017-09-27 Thread Kostas Kloudas
Hi Sridhar,

From looking at your code:

1) The “KafkaDataSource” is a custom source that you implemented? Does this 
source buffer anything?
2) The getStreamSource2 seems to return again a "new 
KafkaDataSource”. Can this be a problem?
3) You are working on processing time and you are simply detecting if 2 
messages of the same type came within 15min right? 
I suppose that this could also be implemented using the times() 
quantifier, but this is just a matter of taste.
Could you reduce this to a smaller duration and see if you still get a 
corrupted stream exception?

Thanks,
Kostas

> On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa  wrote:
> 
> One more point to add.
> 
> I disabled checkpoints (by commenting out code that calls 
> enableCheckpointing()) and re-ran the job this time with plenty of memory to 
> the job manager
> 
> ~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d
> 
> At the Jobmanager, I am still hitting:
> 
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Starting 
> YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, 
> Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Current 
> user: flink
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Maximum heap 
> size: 16384 MiBytes
> 2017-09-25 06:46:44,066 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JAVA_HOME: 
> /usr/lib/jvm/java-8-openjdk-amd64
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Hadoop 
> version: 2.7.2
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  JVM Options:
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx18432m
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_01/jobmanager.log
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlogback.configurationFile=file:logback.xml
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner - 
> -Dlog4j.configuration=file:log4j.properties
> 2017-09-25 06:46:44,067 INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner -  Program 
> Arguments: (none)
> 
>   
>.
>   
>.
> 
> 2017-09-25 06:50:51,925 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map 
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) 
> switched from DEPLOYING to RUNNING.
> 2017-09-25 13:38:54,175 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Created BLOB cache storage directory 
> /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
> 2017-09-25 13:38:54,187 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from 
> localhost/127.0.0.1:0 
> 2017-09-25 16:30:39,974 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) 
> switched from RUNNING to CANCELED.
> 2017-09-25 16:30:39,975 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map 
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown 
> Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at 
> com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
> at 
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
> at 
> 

Re: the design of spilling to disk

2017-09-19 Thread Kostas Kloudas
Hi Florin,

Unfortunately, there is no design document.

The UnilateralSortMerger.java is used in the batch processing mode (not is 
streaming) and, 
in fact, the code dates some years back. I cc also Fabian as he may have more 
things to say on this.

Now for the streaming side, Flink uses 3 state-backends, in-memory (no spilling 
and mainly useful for testing),
filesystem and RocksDB (both eventually spill to disk but in different ways), 
and it also supports incremental 
checkpoints, i.e. at each checkpoint it only stores the diff between 
checkpoint[i] and checkpoint[i-1].

For more information on Flink state and state backends, checkout the latest 
talk from Stefan Richter at 
Flink Forward Berlin 2017 (https://www.youtube.com/watch?v=dWQ24wERItM 
) and the .

Cheers,
Kostas

> On Sep 19, 2017, at 6:00 PM, Florin Dinu  wrote:
> 
> Hello everyone,
> 
> In our group at EPFL we're doing research on understanding and potentially 
> improving the performance of data-parallel frameworks that use secondary 
> storage.
> I was looking at the Flink code to understand how spilling to disk actually 
> works.
> So far I got to the UnilateralSortMerger.java and its spill and reading 
> threads. I also saw there are some spilling markers used.
> I am curious if there is any design document available on this topic.
> I was not able to find much online.
> If there is no such design document I would appreciate if someone could help 
> me understand how these spilling markers are used.
> At a higher level, I am trying to understand how much data does Flink spill 
> to disk after it has concluded that it needs to spill to disk.
> 
> Thank you very much
> Florin Dinu



Re: Queryable State

2017-09-15 Thread Kostas Kloudas
Hi Navneeth,

If you increase the timeout, everything works ok?
I suppose from your config that you are running in standalone mode, right?

Any other information about the job (e.g. code and/or size of state being 
fetched) and 
the cluster setup that can help us pin down the problem, would be appreciated.

Thanks,
Kostas

> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan  
> wrote:
> 
> Hi,
> 
> I am sure I have provided the right job manager details because the 
> connection timeout ip is the task manager where the state is kept. I guess 
> the client is able to reach the job manager and figure out where the state 
> is. Also if I provide a wrong state name, I'm receiving unknown state 
> exception. I couldn't find why there is a timeout and a warning message is 
> logged in the job manager.
> 
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas  > wrote:
> Hi,
> 
> 
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: QueryableState - No KvStateLocation found for KvState instance

2017-09-13 Thread Kostas Kloudas
Hi,

As Biplob said this means that the JM cannot find the requested state.

The reasons can be one of the above but given that you said you are using
the FlinkMiniCluster, I assume you are testing. In this case, it can also
be that you start querying your state to soon after the job is submitted,
so the state is not yet there as the job that creates it has not yet
started.

In this case you can retry the query when it fails until it succeeds
 (assuming that none of the coditions that Biplob mentioned holds). If you
have access to the flink code, you can check the tests in the
queryablestate IT cases.

Kostas

On Sep 13, 2017 4:45 PM, "Biplob Biswas"  wrote:

> Hi Hayden,
>
> From what I know, "No KvStateLocation found for KvState instance with name
> 'word_sums'" is exactly what it means. Your current job can't find the
> KVState instance. This could result due to a few reasons that I know of:
>
> 1. The jobID you supplied for the queryclient job is not equal to the jobID
> of the state creator job.
> 2. There is a typo in the name either when you are creating the state or
> when you are accessing the state.
> 3. You are connected to a completely different jobmanager and not where the
> state is.
>
> There could be more reasons, but these are the ones on top of myhead.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: BucketingSink never closed

2017-09-08 Thread Kostas Kloudas
Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646 


There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
 


Kostas

> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier  wrote:
> 
> Hi to all,
> I'm trying to test a streaming job but the files written by the BucketingSink 
> are never finalized (remains into the pending state).
> Is this caused by the fact that the job finishes before the checkpoint?
> Shouldn't the sink properly close anyway?
> 
> This is my code:
> 
>   @Test
>   public void testBucketingSink() throws Exception {
> final StreamExecutionEnvironment senv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final StreamTableEnvironment tEnv = 
> TableEnvironment.getTableEnvironment(senv);
> senv.enableCheckpointing(5000);
> DataStream testStream = senv.fromElements(//
> "1,aaa,white", //
> "2,bbb,gray", //
> "3,ccc,white", //
> "4,bbb,gray", //
> "5,bbb,gray" //
> );
> final RowTypeInfo rtf = new RowTypeInfo(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO);
> DataStream rows = testStream.map(new MapFunction() {
> 
>   private static final long serialVersionUID = 1L;
> 
>   @Override
>   public Row map(String str) throws Exception {
> String[] split = str.split(Pattern.quote(","));
> Row ret = new Row(3);
> ret.setField(0, split[0]);
> ret.setField(1, split[1]);
> ret.setField(2, split[2]);
> return ret;
>   }
> }).returns(rtf);
> 
> String columnNames = "id,value,state";
> final String dsName = "test";
> tEnv.registerDataStream(dsName, rows, columnNames);
> final String whiteAreaFilter = "state = 'white'";
> DataStream grayArea = rows;
> DataStream whiteArea = null;
> if (whiteAreaFilter != null) {
>   String sql = "SELECT *, (%s) as _WHITE FROM %s";
>   sql = String.format(sql, whiteAreaFilter, dsName);
>   Table table = tEnv.sql(sql);
>   grayArea = 
> tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>   DataStream nw = 
> tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
>   whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
> }
> Writer bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
> 
> String datasetWhiteDir = "/tmp/bucket/white";
> BucketingSink whiteAreaSink = new 
> BucketingSink<>(datasetWhiteDir.toString());
> whiteAreaSink.setWriter(bucketSinkwriter);
> whiteAreaSink.setBatchSize(10);
> whiteArea.addSink(whiteAreaSink);
> 
> String datasetGrayDir = "/tmp/bucket/gray";
> BucketingSink grayAreaSink = new 
> BucketingSink<>(datasetGrayDir.toString());
> grayAreaSink.setWriter(bucketSinkwriter);
> grayAreaSink.setBatchSize(10);
> grayArea.addSink(grayAreaSink);
> 
> JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
> System.out.printf("Job took %s minutes", 
> jobInfo.getNetRuntime(TimeUnit.MINUTES));
>   }
> 
> 
> 
> 
> 
> 
> 
> public class RowCsvWriter extends StreamWriterBase {
>   private static final long serialVersionUID = 1L;
> 
>   private final String charsetName;
>   private transient Charset charset;
>   private String fieldDelimiter;
>   private String recordDelimiter;
>   private boolean allowNullValues = true;
>   private boolean quoteStrings = false;
> 
>   /**
>* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to 
> convert strings to
>* bytes.
>*/
>   public RowCsvWriter() {
> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, 
> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>   }
> 
>   /**
>* Creates a new {@code StringWriter} that uses the given charset to 
> convert strings to bytes.
>*
>* @param charsetName Name of the charset to be used, must be valid input 
> for
>*{@code Charset.forName(charsetName)}
>*/
>   public RowCsvWriter(String charsetName, String fieldDelimiter, String 
> recordDelimiter) {
> this.charsetName = charsetName;
> this.fieldDelimiter = fieldDelimiter;
> this.recordDelimiter = recordDelimiter;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
> super.open(fs, path);
> 
> try {
>   this.charset = Charset.forName(charsetName);
> } catch (IllegalCharsetNameException ex) {
>   throw new IOException("The charset " + charsetName + " is not valid.", 
> ex);
> } catch 

Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you are implementing the wrong Bucketer. 
This seems to be the one for the RollingSink which is deprecated. 
Is this correct?

You should implement the BucketingSink one, which is in the package:

org.apache.flink.streaming.connectors.fs.bucketing

That one requires the implementation of 1 method with signature:

Path getBucketPath(Clock clock, Path basePath, T element);

which from what I understand from you requirements gives you access 
to the element that you need.

Cheers,
Kostas

> On Aug 16, 2017, at 3:31 PM, ant burton <apburto...@gmail.com> wrote:
> 
> 
> Thanks Kostas,
> 
> I’m narrowing in on a solution:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/filesystem_sink.html
>  says "You can also specify a custom bucketer by using setBucketer() on a 
> BucketingSink. If desired, the bucketer can use a property of the element or 
> tuple to determine the bucket directory.”
> 
> BucketingSink sink = new BucketingSink("/base/path");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> Therefore I’ve created a skeleton class:
> 
> public class S3Bucketer implements Bucketer {
>   private static final long serialVersionUID = 1L;
> 
>   private final String formatString;
> 
>   public S3Bucketer() {
>   }
> 
>   private void readObject(ObjectInputStream in) {
>   in.defaultReadObject();
>   }
> 
>   public boolean shouldStartNewBucket(Path basePath, Path 
> currentBucketPath) {
>   return true;
>   }
> 
>   public Path getNextBucketPath(Path basePath) {
>   return new Path(basePath + 
> “/some-path-that-I-need-create-from-the-stream");
>   }
> }
> 
> my question now is how do I access the data stream from within the S3Bucketer 
> so that I can generate a filename based on the data with the data stream.
> 
> Thanks,
> 
>> On 16 Aug 2017, at 12:55, Kostas Kloudas <k.klou...@data-artisans.com> wrote:
>> 
>> In the second link for the BucketingSink, you can set your 
>> own Bucketer using the setBucketer method. You do not have to 
>> implement your own sink from scratch.
>> 
>> Kostas
>> 
>>> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote:
>>> 
>>> or rather 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>>> 
>>> 
>>>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com> 
>>>> wrote:
>>>> 
>>>> Hi Ant,
>>>> 
>>>> I think you can do it by implementing your own Bucketer.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>> .
>>>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com> wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> Given 
>>>>> 
>>>>>   // Set StreamExecutionEnvironment
>>>>>   final StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> 
>>>>>   // Set checkpoints in ms
>>>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> 
>>>>>   // Add source (input stream)
>>>>>   DataStream dataStream = StreamUtil.getDataStream(env, 
>>>>> params);
>>>>> 
>>>>> How can I construct the s3_filename from the content of the an event, it 
>>>>> seems that whenever I attempt this I either have access to an event or 
>>>>> access to .addSink but not both.
>>>>> 
>>>>>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
>>>>> s3_filename));
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
In the second link for the BucketingSink, you can set your 
own Bucketer using the setBucketer method. You do not have to 
implement your own sink from scratch.

Kostas

> On Aug 16, 2017, at 1:39 PM, ant burton <apburto...@gmail.com> wrote:
> 
> or rather 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
> 
> 
>> On 16 Aug 2017, at 12:24, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Ant,
>> 
>> I think you can do it by implementing your own Bucketer.
>> 
>> Cheers,
>> Kostas
>> 
>> .
>>> On Aug 16, 2017, at 1:09 PM, ant burton <apburto...@gmail.com 
>>> <mailto:apburto...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> Given 
>>> 
>>>   // Set StreamExecutionEnvironment
>>>   final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>   // Set checkpoints in ms
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>>   // Add source (input stream)
>>>   DataStream dataStream = StreamUtil.getDataStream(env, params);
>>> 
>>> How can I construct the s3_filename from the content of the an event, it 
>>> seems that whenever I attempt this I either have access to an event or 
>>> access to .addSink but not both.
>>> 
>>> dataStream.addSink(new BucketingSink("s3a://flink/ 
>>> " + s3_filename));
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Access to datastream from BucketSink

2017-08-16 Thread Kostas Kloudas
Hi Ant,

I think you can do it by implementing your own Bucketer.

Cheers,
Kostas

.
> On Aug 16, 2017, at 1:09 PM, ant burton  wrote:
> 
> Hello,
> 
> Given 
> 
>// Set StreamExecutionEnvironment
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>// Set checkpoints in ms
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 
>// Add source (input stream)
>DataStream dataStream = StreamUtil.getDataStream(env, params);
> 
> How can I construct the s3_filename from the content of the an event, it 
> seems that whenever I attempt this I either have access to an event or access 
> to .addSink but not both.
> 
>   dataStream.addSink(new BucketingSink("s3a://flink/" + 
> s3_filename));
> 
> 
> Thanks,
> 
> 
> 
> 



Re: FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-08-12 Thread Kostas Kloudas
Hi Daiqing,

I think Stefan is right and this will be fixed in the upcoming release.
Could you open a JIRA for it with the Exception that you posted here?

Thanks,
Kostas

> On Aug 12, 2017, at 10:05 AM, Stefan Richter  
> wrote:
> 
> Hi,
> 
> from a quick look, I would say this is likely a problem with the 
> NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method 
> is simply returning ‚this‘. This means that code which relies on duplication 
> of serializers to shield against concurrent accesses can break, because 
> multiple threads can work on the same internal serializer state and corrupt 
> it. Will take a deeper look an monday.
> 
> Best,
> Stefan
> 
>> Am 11.08.2017 um 20:55 schrieb Daiqing Li > >:
>> 
>> Hi,
>> 
>> I am running fling 1.3.1 on EMR. But I am getting this exception after 
>> running for a while.
>> 
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 
>> watermark: 
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>  at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>  at 
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Could not copy NFA.
>>  at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>>  at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>>  at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>>  at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>>  at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>>  at 
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>>  at 
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>  at 
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>  ... 7 more
>> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>>  at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>>  at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>>  at 
>> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>>  at 
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>>  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>>  at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>  at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>>  at 
>> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>>  at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>>  at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>>  ... 17 more
> 



Re: Flink CEP issues

2017-08-08 Thread Kostas Kloudas
Hi Daiqing,

Is it possible to share your job in order to reproduce the problem?
Or at least a minimal example. If you see from the JIRA, there is another 
user in https://issues.apache.org/jira/browse/FLINK-6321 
 who had 
a similar problem but we never managed to reproduce it.

Thanks,
Kostas

> On Aug 8, 2017, at 8:13 PM, Daiqing Li  wrote:
> 
> Hi guys,
> 
> We are using flink cep to detect pattern. Here is the exception we got:
> 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at org.apache.flink.streaming.runtime.io 
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.lang.IllegalStateException: Could not find id for entry: 
> SharedBufferEntry(ValueTimeWrapper(, 1502205437762, 0), 
> [SharedBufferEdge(null, 1)], 1)
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
>   at 
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
> 
> In our event class, I use UUID as hash code and implement equals method 
> properly. Anyone knows what is going on?
> 
> Best,
> Daiqing



Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-08-02 Thread Kostas Kloudas
+1

> On Aug 2, 2017, at 3:16 PM, Till Rohrmann  wrote:
> 
> +1
> 
> On Wed, Aug 2, 2017 at 9:12 AM, Stefan Richter 
> wrote:
> 
>> +1
>> 
>> Am 28.07.2017 um 16:03 schrieb Stephan Ewen :
>> 
>> Seems like no one raised a concern so far about dropping the savepoint
>> format compatibility for 1.1 in 1.4.
>> 
>> Leaving this thread open for some more days, but from the sentiment, it
>> seems like we should go ahead?
>> 
>> On Wed, Jul 12, 2017 at 4:43 PM, Stephan Ewen  wrote:
>> 
>>> Hi users!
>>> 
>>> Flink currently maintains backwards compatibility for savepoint formats,
>>> which means that savepoints taken with Flink version 1.1.x and 1.2.x can be
>>> resumed in Flink 1.3.x
>>> 
>>> We are discussing how many versions back to support. The proposition is
>>> the following:
>>> 
>>> *   Suggestion: Flink 1.4.0 will be able to resume savepoints taken with
>>> version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x*
>>> 
>>> 
>>> The reason for that is that there is a lot of code mapping between the
>>> completely different legacy format (1.1.x, not re-scalable) and the
>>> key-group-oriented format (1.2.x onwards, re-scalable). It would greatly
>>> help the development of state and checkpointing features to drop that old
>>> code.
>>> 
>>> Please let us know if you have concerns about that.
>>> 
>>> Best,
>>> Stephan
>>> 
>>> 
>> 
>> 



Re: data loss after implementing checkpoint

2017-07-31 Thread Kostas Kloudas
Hi Sridhar,

Stephan already covered the correct sequence of actions in order for your 
second program
to know its correct starting point.

As far as the active/inactive rules are concerned, as Nico pointed out you have 
to somehow 
store in the backend which rules are active and which are not upon 
checkpointing. If not, upon 
recovery your program will not be able to know which rules to apply and which 
to ignore.

Hope this helps,
Kostas

> On Jul 31, 2017, at 10:27 PM, Stephan Ewen  wrote:
> 
> Maybe to clear up some confusion here:
> 
>   - Flink recovers from the latest checkpoint after a failure
> 
>   - If you stopping/cancelling a Flink job and submit the job again, it does 
> not automatically pick up the latest checkpoint. Flink does not know that the 
> second program is a continuation of the first program.
> 
>   - If you want to second program to resume from the last program, you need 
> to start it with the option to continue from checkpoint/savepoint and pass a 
> path to that checkpoint/savepoint:
>  
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#resuming-from-savepoints
>  
> 
> 
> Stephan
> 
> 
> On Mon, Jul 31, 2017 at 5:27 PM, Nico Kruber  > wrote:
> Hi Sridhar,
> sorry for not coming back to you earlier and tbh, I'm no expert on this field
> either.
> 
> I don't see this enabling/disabling of rules in the CEP library overview at
> [1]. How do you do this?
> 
> You'll probably have to create a stateful operator [2] to store this state in
> Flink. Maybe Kostas (cc'd) may shed some more light onto this topic or has
> some other workaround.
> 
> 
> Nico
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/
> cep.html 
> 
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> state.html 
> 
> 
> On Wednesday, 19 July 2017 06:38:17 CEST Sridhar Chellappa wrote:
> > A follow up question on this. I have a Complex Event processor implemented
> > using the CEP library (1.3.0). The CEP library runs a variety of rules that
> > are configured (enable/disable rule) VIA REST APIs.
> >
> > Now, if my application crashes and recovers (or is cancelled and
> > restarted), will my configuration(as to which rules are enabled) still
> > hold? or do I have to persist the info into a backend?
> >
> > On Mon, Jul 10, 2017 at 7:36 PM, Nico Kruber  > > wrote:
> > > Hi Aftab,
> > > looks like what you want is either an externalized checkpoint with
> > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > >
> > > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > > as a
> > > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > > that the job can be restarted automatically based on the restart policy.
> > >
> > >
> > > Nico
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ 
> > > 
> > > checkpoints.html
> > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ 
> > > 
> > > savepoints.html
> > >
> > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > > Hi,
> > > > I am new to flink. I am facing issue implementing checkpoint.
> > > >
> > > > checkpoint related code:
> > > >
> > > > long checkpointInterval = 5000;
> > > >
> > > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > > >  //specify backend
> > > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > >
> > > true));
> > >
> > > > env.setStateBackend(new
> > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > >
> > > >  //enable checkpoint
> > > >  env.enableCheckpointing(checkpointInterval,
> > > >
> > > > CheckpointingMode.EXACTLY_ONCE);
> > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > >
> > > >
> > > > When I run the code, I can see flink-state being written in my local
> > > > machine. but when I stop the job , wait for a few minutes and restart
> > > > the
> > > > job, it does not pick up from the time it left but it starts from when I
> > > > started the job.
> > > >
> > > > Could you point out what i am doing wrong. I am testing it locally from
> > > > ideaIntellij. below is what i see from localhost. Any help would be
> > > > appreciated. Thanks
> > > > [image: Inline images 1]
> > > > Br,
> 
> 



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
The rules, or patterns supported by FlinkCEP are presented in the documentation 
link
I posted earlier.

Dynamically updating these patterns, is not supported yet, but there are 
discussions to 
add this feature soon.

If the rules you need are supported by the current version of FlinkCEP, then 
you can 
start right away. If not, you need to provide more details.

Kostas

> On Jun 23, 2017, at 3:54 PM, Sridhar Chellappa  wrote:
> 
> CEP



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
Hi all,

Currently there is an ongoing effort to integrate FlinkCEP with Flink's SQL API.
There is already an open FLIP for this:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-20:+Integration+of+SQL+and+CEP>

So, if there was an effort for integration of different 
libraries/tools/functionality as well, it 
would be nice to go a bit more into details on i) what is already there, ii) 
what is planned to be 
integrated for the SQL effort, and iii) what else is required, and consolidate 
the resources
available.

This will allow the community to move faster and with a clear roadmap.

Kostas

> On Jun 23, 2017, at 2:51 PM, Suneel Marthi <smar...@apache.org> wrote:
> 
> FWIW, here's an old Cloudera blog about using Drools with Spark.
> 
> https://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/
>  
> <https://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/>
> 
> It should be possible to invoke Drools from Flink in a similar way (I have 
> not tried it). 
> 
> It all depends on what the use case and how much of present Flink CEP 
> satisfies the use case before considering integration with more complex rule 
> engines.
> 
> 
> Disclaimer: I work for Red Hat
> 
> On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía <ieme...@gmail.com 
> <mailto:ieme...@gmail.com>> wrote:
> Hello,
> 
> It is really interesting to see this discussion because that was one
> of the questions on the presentation on CEP at Berlin Buzzwords, and
> this is one line of work that may eventually make sense to explore.
> 
> Rule engines like drools implement the Rete algorithm that if I
> understood correctly optimizes the analysis of a relatively big set of
> facts (conditions) into a simpler evaluation graph. For more details
> this is a really nice explanation.
> https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/ 
> <https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/>
> 
> On flink's CEP I have the impression that you define this graph by
> hand. Using a rule engine you could infer an optimal graph from the
> set of rules, and then this graph could be translated into CEP
> patterns.
> 
> Of course take all of this with a grain of salt because I am not an
> expert on both CEP or the Rete algorithm, but I start to see the
> connection of both worlds more clearly now. So if anyone else has
> ideas of the feasibility of this or can see some other
> issues/consequences please comment. I also have the impression that
> distribution is less of an issue because the rete network is
> calculated only once and updates are not 'dynamic' (but I might be
> wrong).
> 
> Ismaël
> 
> ps. I add Thomas in copy who was who made the question in the
> conference in case he has some comments/ideas.
> 
> 
> On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
> > Hi Jorn and Sridhar,
> >
> > It would be worth describing a bit more what these tools are and what are
> > your needs.
> > In addition, and to see what the CEP library already offers here you can
> > find the documentation:
> >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html
> >  
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html>
> >
> >
> > Thanks,
> > Kostas
> >
> > On Jun 23, 2017, at 1:41 PM, Jörn Franke <jornfra...@gmail.com 
> > <mailto:jornfra...@gmail.com>> wrote:
> >
> > Hallo,
> >
> > It si possible, but some caveat : flink is a distributed system, but in
> > drools the fact are only locally available. This may lead to strange effects
> > when rules update the fact base.
> >
> > Best regards
> >
> > On 23. Jun 2017, at 12:49, Sridhar Chellappa <flinken...@gmail.com 
> > <mailto:flinken...@gmail.com>> wrote:
> >
> > Folks,
> >
> > I am new to Flink.
> >
> > One of the reasons why I am interested in Flink is because of its CEP
> > library. Our CEP logic comprises of a set of complex business rules which
> > will have to be managed (Create, Update, Delete) by a bunch of business
> > analysts.
> >
> > Is there a way I can integrate other third party tools (Drools, OpenRules)
> > to let Business Analysts define rules and  execute them using Flink's CEP
> > library?
> >
> >
> 



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
Hi Jorn and Sridhar,

It would be worth describing a bit more what these tools are and what are your 
needs.
In addition, and to see what the CEP library already offers here you can find 
the documentation:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html 



Thanks,
Kostas

> On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
> 
> Hallo,
> 
> It si possible, but some caveat : flink is a distributed system, but in 
> drools the fact are only locally available. This may lead to strange effects 
> when rules update the fact base.
> 
> Best regards 
> 
>> On 23. Jun 2017, at 12:49, Sridhar Chellappa  wrote:
>> 
>> Folks,
>> 
>> I am new to Flink.
>> 
>> One of the reasons why I am interested in Flink is because of its CEP 
>> library. Our CEP logic comprises of a set of complex business rules which 
>> will have to be managed (Create, Update, Delete) by a bunch of business 
>> analysts. 
>> 
>> Is there a way I can integrate other third party tools (Drools, OpenRules) 
>> to let Business Analysts define rules and  execute them using Flink's CEP 
>> library?



Re: Flink CEP not emitting timed out events properly

2017-06-20 Thread Kostas Kloudas
Are you sure that after incrementing the wm by 1sec, there is no 
element that will come with a timestamp smaller than this? Or,
that after 10sec of inactivity, no element will come with such a 
timestamp?

Kostas

> On Jun 20, 2017, at 4:18 PM, Biplob Biswas  wrote:
> 
>  currentMaxTimestamp = currentMaxTimestamp + 1000;



Re: Flink CEP not emitting timed out events properly

2017-06-20 Thread Kostas Kloudas
You are correct that elements are waiting until a watermark with a higher 
timestamp 
than theirs (or the patterns timeout) arrives.

Now for the Watermark emitter, 1) how do you measure the 10sec in processing 
time 
and ii) by how much do you advance the watermark. If you advance it by a lot, 
then 
the elements that may come later, they may be considered late. \

For the first question I suppose that you set the watermark interval to 10 sec 
and 
if you see that there were no elements in between, you consider it inactivity 
right?
How do you estimate that there were no elements? You have a flag in the emitter?

> On Jun 20, 2017, at 3:57 PM, Biplob Biswas  wrote:
> 
> Hi Kostas,
> 
> I have out-of-orderness of around 5 seconds from what I have observed but
> that too from events coming from a different topic. The initial topic
> doesn't have out-of-order events still I have added a generous time bound of
> 20 seconds. Still, I will try for a higher number just in order to check a
> bit more.
> 
> The second problem you suggested sounds more interesting because when I
> print my events which have been ingested I see all the events. It's just
> that those events are neither generating a match nor are generating an
> anomaly which felt a bit weird for me.
> 
> In what cases the elements could be buffered and waiting forever? I expected
> that even if they are buffered when the timeout happens I would get all the
> events which were waiting for a match, provided the watermark is reset and
> that I am doing by setting the autoWatermarkinterval and increasing the
> watermark when there are no new events and after every 10-second system
> time.
> 
> Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13857.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink CEP not emitting timed out events properly

2017-06-20 Thread Kostas Kloudas
Hi Biplob,

You are correct that only a higher watermark leads to discarded events.
Are you sure that your custom watermark emitter does not emit a high watermark?
E.g. your partition has elements that are far out-of-order.

In addition, are you sure that your elements are not simply buffered and 
waiting 
for the right watermark?

Kostas

> On Jun 20, 2017, at 2:03 PM, Biplob Biswas  wrote:
> 
> But if that's the case, I don't understand why some of my events are just
> lost  If the watermark which is used is the smallest ... They either I
> expect a match or I expect a timed out event. 
> 
> The only way I can imagine my events getting lost is higher watermark than
> the incoming event and thus that event is discarded as too late. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13854.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Kostas Kloudas
Hi Biplob,

If  you know what you want, you can always write your custom 
AssignerWithPeriodicWatermarks that does your job. If you want
to just increase the watermark, you could simply check if you have
received any elements and if not, emit a watermark with the timestamp
of the previous watermark + X.

Kostas

> On Jun 16, 2017, at 3:28 PM, Biplob Biswas  wrote:
> 
> Hi Kostas,
> 
> Thanks for the reply, makes things a bit more clear.
> 
> Also, I went through this link and it is something similar I am trying to
> observe. 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Listening-to-timed-out-patterns-in-Flink-CEP-td9371.html
> 
> I am checking for timed out events and when I am using event time, its
> behaviour is non-deterministic. For one pattern it's generating a few
> 'matched events' and for a different pattern no 'matched events'. And almost
> no timedout events in any of the patterns unless I run the series of mock
> events a second time, during which I get a series of anomaly events. 
> 
> I had a topic created with this issue but I didn't get any satisfactory
> solutions there, so was testing it with processing time whether it works
> even or not. 
> 
> https://gist.github.com/revolutionisme/cf675ceee1492b93be020d4526bc9d38
> https://gist.github.com/revolutionisme/38578e631f7a15f02cb2488f9fe56c76
> 
> I would really like to know how to increment the watermark without any
> events coming in, such that at least the timedout events are emitted by the
> system. 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13800.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink CEP not emitting timed out events properly

2017-06-16 Thread Kostas Kloudas
Hi Biplob,

With processing time there are no watermarks in the stream. 
The problem that you are seeing is because in processing time, the CEP
library expects the “next” element to come, in order to investigate if 
some of the patterns have timed-out.

Kostas

> On Jun 16, 2017, at 1:29 PM, Biplob Biswas  wrote:
> 
> Hi,
> 
> I am having some issues with FlinkCEP again. This time I am using processing
> time for my CEP job where I am reading from multiple kafka topics and using
> the pattern API to create a rule. I am outputting both, the matched events
> as well as timeout events.
> 
> Now my problem is, I am sending some event over one of the topics such that
> subsequent events wouldn't be generated within the time specified and I
> expect a timed out event.
> 
> But it is not generating the timed out event even after 2 minutes (specified
> interval) and it's only generating the previous timed out events when I am
> sending an extra message over the kafka topic. 
> 
> I am not sure why is this happening, for example:
> 
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da2 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da1 []
> 2> RecordReadEventType 1483278179000 044023a4-edec-439c-b221-806740972da5 []
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da2 []]} @ 1497612386342
> 2> Anomaly Events: {first=[RecordReadEventType 1483278179000
> 044023a4-edec-439c-b221-806740972da1 []]} @ 1497612386342
> 
> in the example above, the anomaly events are generated only after sending
> the event with event id - 044023a4-edec-439c-b221-806740972da5 
> 
> and that too the anomaly event for this particular event is not generated.
> 
> I suspected that the watermark was not updated automatically for the last
> event and it's only updated when a new event comes in the system. So, I
> added the setAutoWatermarkInterval(1000) to the code but no avail.
> 
> 
> Thanks & Regards,
> Biplob Biswas
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Java 8 lambdas for CEP patterns won't compile

2017-06-12 Thread Kostas Kloudas
Done.

> On Jun 12, 2017, at 12:24 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Can you add link to this thread in the JIRA ?
> 
> Cheers
> 
> On Mon, Jun 12, 2017 at 3:15 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Unfortunately, there was no discussion as this regression came as an 
> artifact of the addition of the IterativeConditions, but it will be fixed.
> 
> This is the JIRA to track it:
> https://issues.apache.org/jira/browse/FLINK-6897 
> <https://issues.apache.org/jira/browse/FLINK-6897>
> 
> Kostas
> 
>> On Jun 12, 2017, at 11:51 AM, Ted Yu <yuzhih...@gmail.com 
>> <mailto:yuzhih...@gmail.com>> wrote:
>> 
>> Do know which JIRA / discussion thread had the context for this decision ?
>> 
>> I did a quick search in JIRA but only found FLINK-3681.
>> 
>> Cheers
>> 
>> On Mon, Jun 12, 2017 at 1:48 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi David and Ted,
>> 
>> The documentation is outdated. I will update it today.
>> Java8 Lambdas are NOT supported by CEP in Flink 1.3.
>> 
>> Hopefully this will change soon. I will open a JIRA for this.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Jun 11, 2017, at 11:55 PM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>>  
>> 
>> 
> 
> 



Re: Fink: KafkaProducer Data Loss

2017-06-01 Thread Kostas Kloudas
Hi Ninad,

I think that Gordon could shed some more light on this but I suggest 
you should update your Flink version to at least the 1.2. 

The reason is that we are already in the process of releasing Flink 1.3 
(which will come probably today) and a lot of things have 
changed/fixed/improved since the 1.1 release. In fact, it would help us
a lot if you could check if your problem still exists in the upcoming 1.3 
release.

In addition, I suppose that the 1.1 release will soon be not supported 
anymore.

Cheers,
Kostas

> On Jun 1, 2017, at 12:15 AM, ninad  wrote:
> 
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still
> seeing a data loss. I am not able to get much from logs except this:
> 
> Here's our use case:
> 
> 1) Consume from Kafka
> 2) Apply session window
> 3) Send messages of window to Kafka
> 
> If there's a failure in step 3, because all kafka brokers are down, we see a
> data loss. 
> 
> Here are relevant logs:
> 
> java.lang.Exception: Could not perform checkpoint 2 for operator
> TriggerWindow(ProcessingTimeSessionWindows(3),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
>   at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
>   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
> operator in chain.
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
>   ... 8 more
> Caused by: java.lang.Exception: Failed to snapshot function state of
> TriggerWindow(ProcessingTimeSessionWindows(3),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>   at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
>   ... 9 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: No Alerts with FinkCEP

2017-05-31 Thread Kostas Kloudas
You could also remove the autoWatermarkInterval, if you are satisfied with 
ProcessingTime.

Although keep in mind that processingTime assigns timestamps to elements based 
on the order
that they arrive to the operator. This means that replaying the same stream can 
give different 
results.

If you care about time handling and reproducibility of your results you could 
use event or 
ingestion time (the latter means that each element will have its timestamp 
based on 
when it was first ingested by you Flink job).

For more information on “time” you can look here 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html 


Cheers,
Kostas

> On May 31, 2017, at 2:29 PM, Biplob Biswas  wrote:
> 
> Hi Kostas,
> 
> My application didn't have any timestamp extractor nor my events had any
> timestamp. Still I was using event time for processing it, probably that's
> why it was blocked.
> 
> Now I removed the part where I mention timechracteristics as Event time and
> it works now.
> 
> For example:
> 
> Previously:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000);
> 
> And MyEvent was (No field for timestamp):
> 
> public class BAMEvent {
>  private String id;
>  private String eventName;
>  private String eventId;
>  private List correlationID;
> 
> //Getters
> 
> //Setters
> }
> 
> 
> Now:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> /* env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); */
> env.getConfig().setAutoWatermarkInterval(1000);
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp1p13405.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: No Alerts with FinkCEP

2017-05-31 Thread Kostas Kloudas
Hi Biplob,

Great to hear that everything worked out and that you are not blocked!

For the timestamp assigning issue, you mean that you specified no timestamp
extractor in your job and all your elements had Long.MIN_VALUE timestamp right?

Kostas

> On May 31, 2017, at 1:28 PM, Biplob Biswas  wrote:
> 
> Hi Dawid,
> 
> Thanks for the response. Timeout patterns work like a charm, I saw it
> previously but didn't understood what it does, thanks for explaining that.
> 
> Also, my problem with no alerts is solved now.
> 
> The problem was that I was using "Event Time" for processing whereas my
> events didn't have any timestamp in itself.  So I removed that part (I am
> assuming its working on the processing time now), and now Flink CEP works
> like a charm. 
> 
> Maybe if in the future there's some log stating the same, it would be easier
> and faster to debug the issue. Right now because it printed nothing I had to
> trial and error.
> 
> Anyway, FlinkCEP just saved me days of work (here we were thinking of using
> Spark structured streaming with windowing for the corresponding CEP)
> 
> Thanks a lot for all the Help, I have never been disappointed with the Flink
> user Group. :) 
> 
> Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp1p13401.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Does RichFilterFunction work on multiple thread?

2017-05-26 Thread Kostas Kloudas
Your objects will be processed by a single thread.

Kostas

> On May 26, 2017, at 4:50 PM, luutuan  wrote:
> 
> Hi, when I have a set of objects goes through a RichFilterFunction, by
> default, will the filter handle all objects in 1 single thread or will
> divide the work to multiple threads?
> Thank you.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-RichFilterFunction-work-on-multiple-thread-tp13346.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: No Alerts with FinkCEP

2017-05-26 Thread Kostas Kloudas
Hi Biplob,

For the 1.4 version, the input of the select function has changed to expect a 
list of 
matching events (Map map instead of Map map), as 
we have added quantifiers. 

Also the FIlterFunction has changed to SimpleCondition. 

The documentation is lagging a bit behind, but it is coming soon.

Now for the code I will have to dig into it a bit more.

Kostas

> On May 26, 2017, at 4:07 PM, Biplob Biswas  wrote:
> 
> Hello Kostas,
> 
> Thanks for the suggestions.
> 
> I checked and I am getting my events in the partitionedInput stream when i
> am printing it but still nothing on the alert side. I checked flink UI for
> backpressure and all seems to be normal (I am having at max 1000 events per
> second on the kafka topic so  don't think backpressure could be a problem,
> atleast I expect so)
> 
> Also, I haven't run my test with my test data as a collection but I tried
> with this following example and I did get alerts as a result: 
> 
> 
> // CEPTest using collection
> 
> List inputElements = new ArrayList<>();
>inputElements.add(new MyEvent(1, 'a', 1, 1));
>inputElements.add(new MyEvent(1, 'b', 1, 2));
>inputElements.add(new MyEvent(1, 'b', 2, 2));
>inputElements.add(new MyEvent(1, 'b', 3, 5));
> 
>Pattern pattern = Pattern.begin("a").where(new
> FilterFunction() {
>  private static final long serialVersionUID = 7219646616484327688L;
> 
>  @Override
>  public boolean filter(MyEvent myEvent) throws Exception {
>return myEvent.getPayload() == 'a';
>  }
>}).followedBy("b").where(new FilterFunction() {
>  private static final long serialVersionUID = 7219646616484327688L;
> 
>  @Override
>  public boolean filter(MyEvent myEvent) throws Exception {
>return myEvent.getPayload() == 'b';
>  }
>}).within(Time.seconds(1));//.within(Time.milliseconds(2L));
> 
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>env.getConfig().setAutoWatermarkInterval(1000);
> 
>DataStream input =
> env.fromCollection(inputElements).assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
>  private static final long serialVersionUID = -6619787346214245526L;
> 
>  @Override
>  public long extractAscendingTimestamp(MyEvent myEvent) {
>return myEvent.getTimestamp();
>  }
>});
> 
>PatternStream patternStream = CEP.pattern(input.keyBy(new
> KeySelector() {
>  private static final long serialVersionUID = 6928745840509494198L;
> 
>  @Override
>  public Long getKey(MyEvent myEvent) throws Exception {
>return myEvent.getId();
>  }
>}), pattern);
> 
> 
>patternStream.select(new PatternTimeoutFunction() {
>  @Override
>  public String timeout(Map map, long l) throws
> Exception {
>return map.toString() +" @ "+ l;
>  }
> 
>  private static final long serialVersionUID = 300759199619789416L;
> 
> 
>}, new PatternSelectFunction() {
> 
>  @Override
>  public String select(Map map) throws Exception {
>return map.toString();
>  }
> 
>  private static final long serialVersionUID = 732172159423132724L;
>}).print();
> 
> 
> 
> Also along with that now I upgraded my flink maven project to 1.4-Snapshot
> and there seems to be a problem there. 
> 
> According to  this
> 
>  
> : 
> 
> class MyPatternSelectFunction implements PatternSelectFunction OUT> {
>@Override
>public OUT select(Map pattern) {
>IN startEvent = pattern.get("start");
>IN endEvent = pattern.get("end");
>return new OUT(startEvent, endEvent);
>}
> }
> 
> but when I am doing it it expects a list from my side for the events:
> 
> class MyPatternSelectFunction implements PatternSelectFunction OUT> {
>  @Override
>  public OUT select(Map> map) throws Exception {
>return null;
>  }
> }
> 
> Not really sure what am I doing wrong here, any inputs would be really
> helpful.
> 
> Regards,
> Biplob
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp1p13341.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Fwd: invalid type code: 00

2017-05-26 Thread Kostas Kloudas
I am forwarding Stefan’s reply:

Hi,

this problem can be caused by https://issues.apache.org/jira/browse/FLINK-6044. 
It is fixed in 1.2.1 and 1.3.

Best,
Stefan

>> Am 26.05.2017 um 16:16 schrieb Kostas Kloudas <k.klou...@data-artisans.com>:
>> 
>> Hi,
>> 
>> Could you provide some info on when is this error happening?
>> 
>> From what I see you are using the heap or fs state backend and you are
>> failing to read the state back when restoring from a failure. The failure 
>> can 
>> be unrelated to this, but it could be useful if you could check the task 
>> manager
>> logs to see why the failure happens and if there are any exceptions when 
>> checkpointing 
>> for example.
>> 
>> Also could you share the code for your objects (what you are serializing)?
>> Just to cover some cases.
>> 
>> I aslo include Stefan in this.
>> 
>> Kostas 
>> 
>>> On May 26, 2017, at 1:06 PM, rhashmi <rizhas...@hotmail.com> wrote:
>>> 
>>> Which Flink version you are using?  1.2 
>>> What is your job doing (e.g. operators that you are using)? ProcessFunction
>>> to determine if event is late change event time to current & then window
>>> Which operator throws this exception? i will have to dig it further
>>> Which state-backend are you using? mysql. 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>> 
> 



Fwd: invalid type code: 00

2017-05-26 Thread Kostas Kloudas
I am forwarding Stefan’s reply here:

> Hi,
> 
> this problem can be caused by 
> https://issues.apache.org/jira/browse/FLINK-6044 
> <https://issues.apache.org/jira/browse/FLINK-6044>. It is fixed in 1.2.1 and 
> 1.3.
> 
> Best,
> Stefan
> 
>> Am 26.05.2017 um 16:16 schrieb Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Could you provide some info on when is this error happening?
>> 
>> From what I see you are using the heap or fs state backend and you are
>> failing to read the state back when restoring from a failure. The failure 
>> can 
>> be unrelated to this, but it could be useful if you could check the task 
>> manager
>> logs to see why the failure happens and if there are any exceptions when 
>> checkpointing 
>> for example.
>> 
>> Also could you share the code for your objects (what you are serializing)?
>> Just to cover some cases.
>> 
>> I aslo include Stefan in this.
>> 
>> Kostas 
>> 
>>> On May 26, 2017, at 1:06 PM, rhashmi <rizhas...@hotmail.com 
>>> <mailto:rizhas...@hotmail.com>> wrote:
>>> 
>>> Which Flink version you are using?  1.2 
>>> What is your job doing (e.g. operators that you are using)? ProcessFunction
>>> to determine if event is late change event time to current & then window
>>> Which operator throws this exception? i will have to dig it further
>>> Which state-backend are you using? mysql. 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com <http://nabble.com/>.
>> 
> 



Re: invalid type code: 00

2017-05-26 Thread Kostas Kloudas
Hi,

Could you provide some info on when is this error happening?

From what I see you are using the heap or fs state backend and you are
failing to read the state back when restoring from a failure. The failure can 
be unrelated to this, but it could be useful if you could check the task manager
logs to see why the failure happens and if there are any exceptions when 
checkpointing 
for example.

Also could you share the code for your objects (what you are serializing)?
Just to cover some cases.

I aslo include Stefan in this.

Kostas 

> On May 26, 2017, at 1:06 PM, rhashmi  wrote:
> 
> Which Flink version you are using?  1.2 
> What is your job doing (e.g. operators that you are using)? ProcessFunction
> to determine if event is late change event time to current & then window
> Which operator throws this exception? i will have to dig it further
> Which state-backend are you using? mysql. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326p13332.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: No Alerts with FinkCEP

2017-05-26 Thread Kostas Kloudas
One additional comment, from your code it seems you are using Flink 1.2.
It would be worth upgrading to 1.3. The updated CEP library includes a lot of 
new features and bugfixes.

Cheers,
Kostas

> On May 26, 2017, at 3:33 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Biplob,
> 
> From a first scan of the code I cannot find sth fishy.
> 
> You are working on ProcessingTime, given that you do not 
> provide any time characteristic specification, right?
> 
> In this case, if you print your partitionedInput stream, do you 
> see elements flowing as expected?
> 
> If elements are flowing normally, is any back pressure created? 
> Or you keep on reading records from kafka uninterrupted? 
> I am asking to see if the CEP operator is doing sth that blocks the 
> pipeline or it just discards the elements.
> 
> It could be also worth trying to add a source with artificial elements 
> env.fromCollection(…) 
> to see if in this case everything works normally.
> 
> Kostas
> 
>> On May 26, 2017, at 2:25 PM, Biplob Biswas <revolutioni...@gmail.com> wrote:
>> 
>> Hi,
>> 
>> I just started exploring Flink CEP a day back and I thought I can use it to
>> make a simple event processor. For that I looked into the CEP examples by
>> Till and some other articles. 
>> 
>> Now I have 2 questions which i would like to ask:
>> 
>> *Part 1:*
>> 
>> I came up with the following piece of code, but this is not working as
>> expected.
>> 
>> /// Main **///
>> 
>> 
>> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<>(
>>   "testTopic",
>>   new SimpleStringSchema(),
>>   props);
>> 
>>   DataStream input = env.addSource(consumer);
>>   LOG.info("About to process events");
>>   DataStream events =
>>   input
>>   //.map(s -> s.f1)
>>   .map(new MapStringToRRE())
>>   .filter(Objects::nonNull);
>> 
>>   //events.print();
>> 
>>   DataStream partitionedInput = events
>>   .keyBy((KeySelector<ReadEventType, String>) value ->
>> value.getRawTransactionItem().getChargedAccount());
>> 
>>   Pattern<ReadEventType, ?> pattern =
>> Pattern.begin("first")
>>   .where(event -> event.getFormat() == FormatType.FILE)
>>   .followedBy("second")
>>   .where(event -> event.getFormat() == FormatType.FILE)
>>   .within(Time.seconds(1));
>> 
>>   PatternStream patternStream =
>> CEP.pattern(partitionedInput, pattern);
>> 
>>   DataStream alerts =
>> patternStream.select((PatternSelectFunction<ReadEventType, String>)
>> CEPForBAMRRE::createAlert);
>> 
>>   alerts.print();
>> 
>>   env.execute("CEP monitoring job");
>> }
>> 
>> 
>> ///*** Alert Function returning just concat of txn id
>> ***///
>> 
>> private static String createAlert(Map<String, ReadEventType> pattern) {
>>   return pattern.get("first").getTransactionItem().getUid() + " " +
>>   pattern.get("second").getTransactionItem().getUid();
>> }
>> 
>> ///*** properties for kafka **///
>> 
>> private static Properties getDefaultProperties(Properties prop){
>>   prop.put("group.id", "FlinkCEP");
>>   prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>>   prop.put("zookeeper.connect", ZKEEPER);
>>   prop.put("auto.offset.reset", "earliest");
>>   return prop;
>> }
>> 
>> 
>> As my kafka topic only sends me events with formattype = FILE, I was
>> expecting to see multiple alerts being raised. But thats not the case, i am
>> not getting any alert at the moment.
>> 
>> Can anyone point out what am I doing wrong? 
>> 
>> PART 2: 
>> 
>> Also, my main aim for using CEP is to read from different topics and raise
>> alert if a second event is *not* followed by a first event within a given
>> time interval. How can I achieve it with FlinkCEP? for now I can only see
>> that if 2 events follow within a time interval an alert should be raised. 
>> 
>> 
>> Thanks & Regards,
>> Biplob
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp1.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: No Alerts with FinkCEP

2017-05-26 Thread Kostas Kloudas
Hi Biplob,

From a first scan of the code I cannot find sth fishy.

You are working on ProcessingTime, given that you do not 
provide any time characteristic specification, right?

In this case, if you print your partitionedInput stream, do you 
see elements flowing as expected?

If elements are flowing normally, is any back pressure created? 
Or you keep on reading records from kafka uninterrupted? 
I am asking to see if the CEP operator is doing sth that blocks the 
pipeline or it just discards the elements.

It could be also worth trying to add a source with artificial elements 
env.fromCollection(…) 
to see if in this case everything works normally.

Kostas

> On May 26, 2017, at 2:25 PM, Biplob Biswas  wrote:
> 
> Hi,
> 
> I just started exploring Flink CEP a day back and I thought I can use it to
> make a simple event processor. For that I looked into the CEP examples by
> Till and some other articles. 
> 
> Now I have 2 questions which i would like to ask:
> 
> *Part 1:*
> 
> I came up with the following piece of code, but this is not working as
> expected.
> 
> /// Main **///
> 
> 
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<>(
>"testTopic",
>new SimpleStringSchema(),
>props);
> 
>DataStream input = env.addSource(consumer);
>LOG.info("About to process events");
>DataStream events =
>input
>//.map(s -> s.f1)
>.map(new MapStringToRRE())
>.filter(Objects::nonNull);
> 
>//events.print();
> 
>DataStream partitionedInput = events
>.keyBy((KeySelector) value ->
> value.getRawTransactionItem().getChargedAccount());
> 
>Pattern pattern =
> Pattern.begin("first")
>.where(event -> event.getFormat() == FormatType.FILE)
>.followedBy("second")
>.where(event -> event.getFormat() == FormatType.FILE)
>.within(Time.seconds(1));
> 
>PatternStream patternStream =
> CEP.pattern(partitionedInput, pattern);
> 
>DataStream alerts =
> patternStream.select((PatternSelectFunction)
> CEPForBAMRRE::createAlert);
> 
>alerts.print();
> 
>env.execute("CEP monitoring job");
>  }
> 
> 
> ///*** Alert Function returning just concat of txn id
> ***///
> 
>  private static String createAlert(Map pattern) {
>return pattern.get("first").getTransactionItem().getUid() + " " +
>pattern.get("second").getTransactionItem().getUid();
>  }
> 
> ///*** properties for kafka **///
> 
>  private static Properties getDefaultProperties(Properties prop){
>prop.put("group.id", "FlinkCEP");
>prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>prop.put("zookeeper.connect", ZKEEPER);
>prop.put("auto.offset.reset", "earliest");
>return prop;
>  }
> 
> 
> As my kafka topic only sends me events with formattype = FILE, I was
> expecting to see multiple alerts being raised. But thats not the case, i am
> not getting any alert at the moment.
> 
> Can anyone point out what am I doing wrong? 
> 
> PART 2: 
> 
> Also, my main aim for using CEP is to read from different topics and raise
> alert if a second event is *not* followed by a first event within a given
> time interval. How can I achieve it with FlinkCEP? for now I can only see
> that if 2 events follow within a time interval an alert should be raised. 
> 
> 
> Thanks & Regards,
> Biplob
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp1.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Are timers in ProcessFunction fault tolerant?

2017-05-26 Thread Kostas Kloudas
Yes, that is correct.

Kostas

> On May 26, 2017, at 11:05 AM, Moiz S Jinia <moiz.ji...@gmail.com> wrote:
> 
> Thanks Kostas. So even though the timer state is managed separately from the 
> key state (from runtimeContext) I can safely assume both the states to be 
> fault tolerant and maintain association with the key of the stream?
> 
> On Fri, May 26, 2017 at 1:51 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Moiz,
> 
> state.clear() refers to the state that you have registered in your job, using 
> the getState()
> from the runtimeContext.
>  
> Timers are managed by Flink’s timer service and they are cleaned up by Flink 
> itself when 
> the job terminates.
> 
> Kostas
> 
>> On May 26, 2017, at 6:41 AM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> 
>> A follow on question. Since the registered timers are part of the managed 
>> key state, do the timers get cancelled when i call state.clear()?
>> 
>> Moiz
>> 
>> On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> Awesome. Thanks.
>> 
>> On Thu, May 25, 2017 at 10:13 PM, Eron Wright <eronwri...@gmail.com 
>> <mailto:eronwri...@gmail.com>> wrote:
>> Yes, registered timers are stored in managed keyed state and should be 
>> fault-tolerant. 
>> 
>> -Eron
>> 
>> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> With a checkpointed RocksDB based state backend, can I expect the registered 
>> processing timers to be fault tolerant? (along with the managed keyed state).
>> 
>> Example -
>> A task manager instance owns the key k1 (from a keyed stream) that has 
>> registered a processing timer with a timestamp thats a day ahead in the 
>> future. If this instance is killed, and the key is moved to another 
>> instance, will the onTimer trigger correctly on the other machine at the 
>> expected time with the same keyed state (for k1)?
>> 
>> Thanks,
>> Moiz
>> 
>> 
>> 
> 
> 



Re: invalid type code: 00

2017-05-26 Thread Kostas Kloudas
Hi! 

Can you give us some information about your job? 

Which Flink version you are using? 
What is your job doing (e.g. operators that you are using)?
Which operator throws this exception?
Which state-backend are you using?

This exception means that you cannot retrieve your state
because of some error while deserializing some Pojo objects.

Thanks,
Kostas

> On May 26, 2017, at 1:13 AM, rhashmi  wrote:
> 
> Sprodically i am seeing this error. Any idea?
> 
> 
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:664)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:651)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1381)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at java.util.HashMap.readObject(HashMap.java:1404)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>   at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503)
>   at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.readObject(PojoSerializer.java:130)
>   at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
>   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>   at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292)
>   at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy.read(TypeSerializerSerializationProxy.java:97)
>   at
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:88)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:299)
>   at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:799)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
>   ... 6 more
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/invalid-type-code-00-tp13326.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Are timers in ProcessFunction fault tolerant?

2017-05-26 Thread Kostas Kloudas
Hi Moiz,

state.clear() refers to the state that you have registered in your job, using 
the getState()
from the runtimeContext.
 
Timers are managed by Flink’s timer service and they are cleaned up by Flink 
itself when 
the job terminates.

Kostas

> On May 26, 2017, at 6:41 AM, Moiz S Jinia  wrote:
> 
> A follow on question. Since the registered timers are part of the managed key 
> state, do the timers get cancelled when i call state.clear()?
> 
> Moiz
> 
> On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia  > wrote:
> Awesome. Thanks.
> 
> On Thu, May 25, 2017 at 10:13 PM, Eron Wright  > wrote:
> Yes, registered timers are stored in managed keyed state and should be 
> fault-tolerant. 
> 
> -Eron
> 
> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia  > wrote:
> With a checkpointed RocksDB based state backend, can I expect the registered 
> processing timers to be fault tolerant? (along with the managed keyed state).
> 
> Example -
> A task manager instance owns the key k1 (from a keyed stream) that has 
> registered a processing timer with a timestamp thats a day ahead in the 
> future. If this instance is killed, and the key is moved to another instance, 
> will the onTimer trigger correctly on the other machine at the expected time 
> with the same keyed state (for k1)?
> 
> Thanks,
> Moiz
> 
> 
> 



Re: Question about start with checkpoint.

2017-05-20 Thread Kostas Kloudas
Hi,

In order to change parallelism, you should take a savepoint, as described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
 


Kostas

> On May 21, 2017, at 5:43 AM, yunfan123  wrote:
> 
> How this exactly works?
> For example, I  save my state using rocksdb + hdfs.
> When I change the parallelism of my job,  can start from checkpoint work?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-about-start-with-checkpoint-tp13234.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: FlinkCEP latency/throughput

2017-05-17 Thread Kostas Kloudas
Hello Alfred,

As a first general remark, Flink was not optimized for multicore deployments 
but rather for distributed environments. This implies overheads (serialization, 
communication etc), when compared to libs optimized for multicores. So there
may be libraries that are better optimized for those settings if you are 
planning 
to use just a multicore machine.

Now for your suggestion:

> On May 16, 2017, at 6:03 PM, Sonex  wrote:
> 
> Hello everyone,
> 
> I am testing some patterns with FlinkCEP and I want to measure latency and
> throughput when using 1 or more processing cores. How can I do that ??
> 
> What I have done so far:
> Latency: Each time an event arrives I store the system time
> (System.currentTimeMillis). When flink calls the select function which means
> we have a full pattern match, again I take the system time. The difference
> of the system time taken from the first event of the complex event and the
> system time taken when the function is called is the latency for now.
> 

1) If you are using event time, then you are also accounting for internal 
buffering and 
ordering of the incoming events.
 
2) I am not sure if measuring the time between the arrival of each element, and 
when 
its matching pattern is emitted makes much sense. In a long pattern, the first 
element
in the matching pattern will wait inevitably longer than the last one, right?

> Throughput: I divide the total number of the events of the dataset by the
> time taken to complete the experiment.
> 
> 

For throughput you could create a job with a sink that does nothing and only a 
CEP pattern
in your job and count the elements read by your source/min. If your source is 
not the bottleneck
then the CEP part of the pipeline is the dominating factor (given that your 
sink just discards everything
so it cannot create backpressure).

I hope this helps,
Kostas

Re: Timer fault tolerance in Flink

2017-05-17 Thread Kostas Kloudas
Hi Rahul,

The timers are fault tolerant and their timestamp is the absolute value of when 
to fire.
This means that if you are at time t = 10 and you register a timer “10 ms from 
now”, the timer will have a firing timestamp of 20.
This is checkpointed, so the “new machine” that takes over the failed task, 
will have the timer with timestamp 20.

So the when the timer will fire depends on the “new machine” and it may differ 
from what would happen in the previous machine in the 
following cases:
For processing time, in case your new machine (the one that takes over 
the failed task) has a clock that is out-of-sync with the 
previous machine that set the timer to 20.
For event time, given that Flink does not checkpoint watermarks, the 
timer will fire when the watermark on the new machine surpasses 
the timer's timestamp.

I hope this helps,
Kostas

> On May 17, 2017, at 12:36 PM, Rahul Kavale  wrote:
> 
> I am looking at timers in apache flink and wanted to confirm if the timers in 
> flink are fault tolerant.
> 
> eg. when a timer registered with processFunction, of say 20 sec is running on 
> a node and after 15 seconds (since the timer started), the node failed for 
> some reason. Does flink guarantee that the timer resume on another node? if 
> it does resume does it consider only the remaining time for the timer ie 5 
> sec in this case?
> 
> Thanks & Regards,
> 
> Rahul
> 



Re: Stateful streaming question

2017-05-17 Thread Kostas Kloudas
Hi Flavio,

For setting the retries, unfortunately there is no such setting yet and, if I 
am not wrong, in case of a failure of a request, 
an exception will be thrown and the job will restart. I am also including Till 
in the thread as he may know better.

For consistency guarantees and concurrency control, this depends on your 
underlying backend. But if you want to 
have end-to-end control, then you could do as Ankit suggested at his point 3), 
i.e have a single job for the whole pipeline
 (if this fits your needs of course). This will allow you to set your own 
“precedence” rules for your operations.

Now finally, there is no way currently to expose the state of a job to another 
job. The way to do so is either Queryable
State, or writing to a Sink. If the problem for having one job is that you emit 
one element at a time, you can always group
elements together and emit downstream less often, in batches.
 
Finally, if  you need 2 jobs, you can always use a hybrid solution where you 
keep your current state in Flink, and you dump it 
to a Sink that is queryable once per week for example. The Sink then can be 
queried at any time, and data will be at most one 
week old.

Thanks,
Kostas

> On May 17, 2017, at 9:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ankit, just a brief comment on the batch job is easier than streaming job 
> argument. I'm not sure about that. 
> I can see that just the batch job might seem easier to implement, but this is 
> only one part of the whole story. The operational side of using batch is more 
> complex IMO. 
> You need a tool to ingest your stream, you need storage for the ingested 
> data, you need a periodic scheduler to kick of your batch job, and you need 
> to take care of failures if something goes wrong. 
> The streaming case, this is not needed or the framework does it for you.
> 
> Just my 2 cents, Fabian
> 
> 2017-05-16 20:58 GMT+02:00 Jain, Ankit <ankit.j...@here.com 
> <mailto:ankit.j...@here.com>>:
> Hi Flavio,
> 
> While you wait on an update from Kostas, wanted to understand the use-case 
> better and share my thoughts-
> 
>  
> 
> 1)   Why is current batch mode expensive? Where are you persisting the 
> data after updates? Way I see it by moving to Flink, you get to use RocksDB(a 
> key-value store) that makes your lookups faster – probably right now you are 
> using a non-indexed store like S3 maybe?
> 
> So, gain is coming from moving to a better persistence store suited to your 
> use-case than from batch->streaming. Myabe consider just going with a 
> different data store.
> 
> IMHO, stream should only be used if you really want to act on the new events 
> in real-time. It is generally harder to get a streaming job correct than a 
> batch one.
> 
>  
> 
> 2)   If current setup is expensive due to serialization-deserialization 
> then that should be fixed by moving to a faster format (maybe AVRO? - I don’t 
> have a lot of expertise in that). I don’t see how that problem will go away 
> with Flink – so still need to handle serialization.
> 
>  
> 
> 3)   Even if you do decide to move to Flink – I think you can do this 
> with one job, two jobs are not needed. At every incoming event, check the 
> previous state and update/output to kafka or whatever data store you are 
> using.
> 
>  
> 
>  
> 
> Thanks
> 
> Ankit
> 
>  
> 
> From: Flavio Pompermaier <pomperma...@okkam.it <mailto:pomperma...@okkam.it>>
> Date: Tuesday, May 16, 2017 at 9:31 AM
> To: Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>>
> Cc: user <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: Stateful streaming question
> 
>  
> 
> Hi Kostas,
> 
> thanks for your quick response. 
> 
> I also thought about using Async IO, I just need to figure out how to 
> correctly handle parallelism and number of async requests. 
> 
> However that's probably the way to go..is it possible also to set a number of 
> retry attempts/backoff when the async request fails (maybe due to a too busy 
> server)?
> 
>  
> 
> For the second part I think it's ok to persist the state into RocksDB or 
> HDFS, my question is indeed about that: is it safe to start reading (with 
> another Flink job) from RocksDB or HDFS having an updatable state "pending" 
> on it? Should I ensure that state updates are not possible until the other 
> Flink job hasn't finish to read the persisted data?
> 
>  
> 
> And another question...I've tried to draft such a processand basically I have 
> the following code:
> 
>  
> 
> DataStream groupedObj = tuples.keyBy(0)
> 
> .flatMap(n

Re: Stateful streaming question

2017-05-16 Thread Kostas Kloudas
Hi Flavio,

From what I understand, for the first part you are correct. You can use Flink’s 
internal state to keep your enriched data.
In fact, if you are also querying an external system to enrich your data, it is 
worth looking at the AsyncIO feature:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
 


Now for the second part, currently in Flink you cannot iterate over all 
registered keys for which you have state. A pointer 
to look at the may be useful is the queryable state:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html
 


This is still an experimental feature, but let us know your opinion if you use 
it.

Finally, an alternative would be to keep state in Flink, and periodically flush 
it to an external storage system, which you can
query at will.

Thanks,
Kostas


> On May 16, 2017, at 4:38 PM, Flavio Pompermaier  wrote:
> 
> Hi to all,
> we're still playing with Flink streaming part in order to see whether it can 
> improve our current batch pipeline.
> At the moment, we have a job that translate incoming data (as Row) into 
> Tuple4, groups them together by the first field and persist the result to 
> disk (using a thrift object). When we need to add tuples to those grouped 
> objects we need to read again the persisted data, flat it back to Tuple4, 
> union with the new tuples, re-group by key and finally persist.
> 
> This is very expansive to do with batch computation while is should pretty 
> straightforward to do with streaming (from what I understood): I just need to 
> use ListState. Right?
> Then, let's say I need to scan all the data of the stateful computation (key 
> and values), in order to do some other computation, I'd like to know:
> how to do that? I.e. create a DataSet/DataSource from the stateful 
> data in the stream
> is there any problem to access the stateful data without stopping incoming 
> data (and thus possible updates to the states)?
> Thanks in advance for the support,
> Flavio
> 



Re: Problem with Kafka Consumer

2017-05-16 Thread Kostas Kloudas
Hi Simone,

Glad I could help ;)

Actually it would be great if you could also try out the upcoming (not yet 
released) 1.3 version 
and let us know if you find something that does not work as expected.

We are currently in the phase of testing it, as you may have noticed, and every 
contribution to 
that front is more than welcomed.

Cheers,
Kostas

> On May 16, 2017, at 4:30 PM, simone <simone.povosca...@gmail.com> wrote:
> 
> Hi Kostas,
> 
> thanks for your suggestion. Indeed, replacing my custom sink with a simpler 
> one problem bring out that the cause of the problem was RowToQuery as you 
> suggested. The sink was blocking the reads making the Kafka pipeline stall, 
> due to a misconfiguration of an internal client that is calling an external 
> service.
> 
> Thanks for your help,
> Simone.
> 
> On 16/05/2017 14:01, Kostas Kloudas wrote:
>> Hi Simone,
>> 
>> I suppose that you use messageStream.keyBy(…).window(…) right? .windowAll() 
>> is not applicable to keyedStreams.
>> 
>> Some follow up questions are:
>> 
>> In your logs, do you see any error messages? 
>> What does your RowToQuery() sink do? Can it be that it blocks and the back 
>> pressure makes all the pipeline stall?
>> To check that, you can: 
>>  1) check the webui for backpressure metrics
>>  2) replace your sink with a dummy one that just prints whatever it 
>> receives
>>  3) or even put a flatmap after reading from Kafka (before the keyBy()) 
>> that prints the elements before sending 
>>  them downstream, so that you know if the consumer keeps on 
>> reading.
>> 
>> Let us know what is the result for the previous.
>> 
>> Thanks,
>> Kostas
>> 
>>> On May 16, 2017, at 10:44 AM, simone <simone.povosca...@gmail.com 
>>> <mailto:simone.povosca...@gmail.com>> wrote:
>>> 
>>> Hi to all,
>>> 
>>> I have a problem with Flink and Kafka queues.
>>> 
>>> I have a Producer that puts some Rows into a data Sink represented by a 
>>> kafka queue and a Consumer that reads from this sink and process Rows in 
>>> buckets of N elements using custom trigger function
>>> messageStream.keyBy(0)
>>> .windowAll(GlobalWindows.create())
>>> .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), N))
>>> .apply(new RowToQuery());
>>> 
>>> 
>>> The problem is that the Consumer, stop to consume data once reached about 
>>> 1000 rows.
>>> With N = 20 the consumer process 50 buckets for a total of 1000 elements. 
>>> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
>>> With N = 68 the consumer process 15 buckets for a total of 1020 elements. 
>>> And so on... 
>>> The same happens also without using a custom trigger function, but with 
>>> simple CountTrigger function:
>>> 
>>> messageStream.keyBy(0)
>>> .windowAll(GlobalWindows.create())
>>>  .trigger(PurgingTrigger.of(CountTrigger.of(N)))
>>>  .apply(new RowToQuery());
>>> How is it possible? Is there any properties on Consumer to be set in order 
>>> to process more data?
>>> 
>>> Thanks,
>>> 
>>> Simone.
>> 
> 



Re: Problem with Kafka Consumer

2017-05-16 Thread Kostas Kloudas
Hi Simone,

I suppose that you use messageStream.keyBy(…).window(…) right? .windowAll() is 
not applicable to keyedStreams.

Some follow up questions are:

In your logs, do you see any error messages? 
What does your RowToQuery() sink do? Can it be that it blocks and the back 
pressure makes all the pipeline stall?
To check that, you can: 
1) check the webui for backpressure metrics
2) replace your sink with a dummy one that just prints whatever it 
receives
3) or even put a flatmap after reading from Kafka (before the keyBy()) 
that prints the elements before sending 
them downstream, so that you know if the consumer keeps on 
reading.

Let us know what is the result for the previous.

Thanks,
Kostas

> On May 16, 2017, at 10:44 AM, simone  wrote:
> 
> Hi to all,
> 
> I have a problem with Flink and Kafka queues.
> 
> I have a Producer that puts some Rows into a data Sink represented by a kafka 
> queue and a Consumer that reads from this sink and process Rows in buckets of 
> N elements using custom trigger function
> messageStream.keyBy(0)
> .windowAll(GlobalWindows.create())
> .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), N))
> .apply(new RowToQuery());
> 
> 
> The problem is that the Consumer, stop to consume data once reached about 
> 1000 rows.
> With N = 20 the consumer process 50 buckets for a total of 1000 elements. 
> With N = 21 the consumer process 48 buckets for a total of 1008 elements.
> With N = 68 the consumer process 15 buckets for a total of 1020 elements. And 
> so on... 
> The same happens also without using a custom trigger function, but with 
> simple CountTrigger function:
> 
> messageStream.keyBy(0)
> .windowAll(GlobalWindows.create())
>  .trigger(PurgingTrigger.of(CountTrigger.of(N)))
>  .apply(new RowToQuery());
> How is it possible? Is there any properties on Consumer to be set in order to 
> process more data?
> 
> Thanks,
> 
> Simone.



Re: assignTimestampsAndWatermarks not working as expected

2017-05-04 Thread Kostas Kloudas
Hi Jayesh,

Glad that it finally worked! 

From a first look, I cannot spot anything wrong with the code itself.
The only thing I have to note is that the locations of the logs and the 
printouts  you put
in your code differ and normally they are not printed in the console.

Thanks,
Kostas

> On May 4, 2017, at 6:45 PM, Jayesh Patel  wrote:
> 
> I figured out what’s wrong – there was a silly mistake on my side.  There is 
> nothing wrong with the code  here, but please do let me know if you see 
> anything wrong with my approach.
>  
> Thank you.
>  
> From: Jayesh Patel 
> Sent: Thursday, May 04, 2017 10:00 AM
> To: 'user@flink.apache.org' 
> Subject: assignTimestampsAndWatermarks not working as expected
>  
> Can anybody see what’s wrong with the following code?  I am using Flink 1.2 
> and have tried running it in Eclipse (local mode) as well as on a 3 node 
> cluster and it’s not behaving as expected.
>  
> The idea is to have a custom source collect messages from a JMS topic (I have 
> a fake source for now that generates some out of order messages with event 
> time that is not delayed more than 5 seconds).  The source doesn’t 
> collectWithTimestamp() or emitWatermark().
> The messages (events) include the event time.  In order to allow for late or 
> out of order messages I use assignTimestampsAndWatermarks with 
> BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method 
> retrieves the event time from the event.
>  
> When I run this job, I don’t get the printout from the extractTimestamp() 
> method, nor do I get the logTuples.print() or stampedLogs.print() output.  
> When running on the local environment(Eclipse) I do see the printouts from 
> the fake source (MockSource – not shown here).  But I don’t even get those 
> when run from my 3 node cluster with parallelism of 3.
>  
> public static void main(String[] args) throws Exception {
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, 
> didn’t affect the behavior
>  
>DataStream logs = env.addSource(new MockSource());
>DataStream> logTuples = logs.map(new 
> ParseEvent());
>logTuples.print();
>  
>  
>DataStream> stampedLogs = 
> logTuples.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5))
>  {
>  private static final long serialVersionUID = 1L;
>  @Override
>  public long extractTimestamp(Tuple2 
> element) {
> // This is how to extract timestamp from the event
>long eventTime = 
> element.f1.getEventStartTime().toInstant().toEpochMilli();
>System.out.println("returning event time " + 
> eventTime);
>return eventTime;
>  }});
>stampedLogs.print();
>env.execute(“simulation”);
> }
>  
> Thank you,
> Jayesh



Re: Long running time based Patterns

2017-05-04 Thread Kostas Kloudas
Hi Moiz,

Then it should work.
And the previous issue is already fixed on the master.

Kostas

> On May 4, 2017, at 6:02 PM, Moiz Jinia <moiz.ji...@gmail.com> wrote:
> 
> It'll definitely have a where clause. Just forgot to include it in the 
> example. Just meant to focus on the within clause.
> 
> Am on 1.3 - expect it'll be fixed by the time stable is out?
> 
> Thanks!
> 
> Moiz
> 
> —
> sent from phone
> 
> On 04-May-2017, at 8:12 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> 
> Hi Moiz,
> 
> You are on Flink 1.2 or 1.3? 
> In Flink 1.2 (latest stable) there are no known issues, so this will work 
> correctly. 
> Keep in mind that without any conditions (where-clauses), you will only get 
> all possible 
> 2-tuples of incoming elements, which could also be done with a simple process 
> function I would say.
> 
> In Flink 1.3 (unreleased) there is this issue: 
> https://issues.apache.org/jira/browse/FLINK-6445 
> <https://issues.apache.org/jira/browse/FLINK-6445>
> 
> Thanks,
> Kostas
> 
>> On May 4, 2017, at 1:45 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> 
>> Does Flink (with a persistent State backend such as RocksDB) work well with 
>> long running Patterns of this type? (running into days)
>> 
>> Pattern.begin("start").followedBy("end").within(Time.days(3))
>> 
>> Is there some gotchas here or things to watch out for?
>> 
>> Thanks,
>> Moiz
> 



Re: OperatorState partioning when recovering from failure

2017-05-04 Thread Kostas Kloudas
Hi Seth,

Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
that state is repartitioned 
in a round robin way (although I am not 100% sure so I am also including Stefan 
and Aljoscha in this).
The priority queues will be reconstructed based on the restored elements. So 
task managers may get
a relatively equal number of splits, but “recent” ones may be concentrated on a 
few nodes. This may 
also have to do with how your monitor sends them to the reader (e.g. all splits 
of a recent file go to the 
same node).

As far as I know, we do not have an option for custom state re-partitioner.

To see what is restored, you can enable DEBUG logging and this will print upon 
restoring sth like:

"ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
{restoredReaderState}"

with the restoredReaderState containing the restored splits.

And something similar upon checkpointing. This will give you a better look in 
what may be happening.

Thanks,
Kostas

> On May 4, 2017, at 3:45 PM, Seth Wiesman  wrote:
> 
> I am curious about how operator state is repartitioned to subtasks when a job 
> is resumed from a checkpoint or savepoint. The reason is that I am having 
> issues with the ContinuousFileReaderOperator when recovering from a failure. 
>  
> I consume most of my data from files off S3. I have a custom file monitor 
> that understands how to walk my directory structure and outputs 
> TimestampedFileSplits downstream in chronological order to the stock 
> ContinuousFileReaderOperator. The reader consumes those splits and stores 
> them a priority queue based on their last modified time ensuring that files 
> are read in chronological order which is exactly what I want. The problem is 
> when recovering, the unread splits being partitioned out to each of the 
> subtasks seem to be heavily skewed in terms of last modified time.
>  
> While each task may have a similar number of files I find then one or two 
> will have a disproportionate number of old files. This in turn holds back my 
> watermark (sometimes for several hours depending on the number of unread 
> splits) which keeps timers from firing, windows from purging, etc.
>  
> I was hoping there were some way I could add a custom partitioner to ensure 
> that splits are uniformly distributed in a temporal manner or if someone had 
> other ideas of how I could mitigate the problem.
>  
> Thank you, 
>  
> Seth Wiesman 
>  



Re: Join two kafka topics

2017-05-04 Thread Kostas Kloudas
Perfect! 
Thanks a lot for the clarification!

Kostas

> On May 4, 2017, at 4:37 PM, Tarek khal  wrote:
> 
> Hi Kostas,
> 
> Yes, now is solved by the help of Jason.
> 
> Best,
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p13006.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Join two kafka topics

2017-05-04 Thread Kostas Kloudas
Hi Tarek,

This question seems to be a duplicate with your other question “ConnectedStream 
keyBy issues”, right?
I am just asking for clarification.

Thanks,
Kostas

> On May 4, 2017, at 1:41 PM, Tarek khal  wrote:
> 
> Hi Aljoscha,
> 
> I tested ConnectedStream and CoFlatMapFunction as you told me but the result
> is not as I wait.
> 
> 
> *For the execution:*
> 
> 1) I added 3 rules on "rules" topic (imei: "01","02,"03") 
> 2) Perform 15 events with different imei but i guess i have problem with
> "keyby"
> 
> *Result : *
> 
> 
>  
> 
> Code :
> 
> 
> Best,
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954p12998.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Long running time based Patterns

2017-05-04 Thread Kostas Kloudas
Hi Moiz,

You are on Flink 1.2 or 1.3? 
In Flink 1.2 (latest stable) there are no known issues, so this will work 
correctly. 
Keep in mind that without any conditions (where-clauses), you will only get all 
possible 
2-tuples of incoming elements, which could also be done with a simple process 
function I would say.

In Flink 1.3 (unreleased) there is this issue: 
https://issues.apache.org/jira/browse/FLINK-6445 


Thanks,
Kostas

> On May 4, 2017, at 1:45 PM, Moiz S Jinia  wrote:
> 
> Does Flink (with a persistent State backend such as RocksDB) work well with 
> long running Patterns of this type? (running into days)
> 
> Pattern.begin("start").followedBy("end").within(Time.days(3))
> 
> Is there some gotchas here or things to watch out for?
> 
> Thanks,
> Moiz



Re: CEP timeout occurs even for a successful match when using followedBy

2017-05-02 Thread Kostas Kloudas
Glad to hear that Moiz!
And thanks for helping us test out the library.

Kostas

> On May 2, 2017, at 12:34 PM, Moiz S Jinia <moiz.ji...@gmail.com> wrote:
> 
> Thanks! I downloaded and built 1.3-SNAPSHOT locally and was able to verify 
> that followedBy now works as I want.
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 11:08 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Moiz,
> 
> Here are the instructions on how to build Flink from source:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html>
> 
> Kostas
> 
>> On Apr 29, 2017, at 7:15 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> 
>> I meant maven dependencies that i can use by generating them from sources.
>> 
>> On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> Ok I'll try that. Its just that I'd rather use a stable version.
>> Are there any instructions for building binaries from latest sources?
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas 
>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Moiz,
>> 
>> The skip-till-next is a big change and backporting it does not seem 
>> feasible. 
>> Also this would require more general changes to the 1.2 to make it 
>> compatible with the previous 1.2 versions.
>> 
>> If you want you can already use the 1.3 version by downloading the master 
>> branch and writing your 
>> use-case against that. The changes until the final release are going to be 
>> minor hopefully and we can
>> always help you adjust your program accordingly.
>> 
>> Hope this helps,
>> Kostas
>> 
>>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>>> <mailto:moiz.ji...@gmail.com>> wrote:
>>> 
>>> Oh ok thats a bit far off. Is there any chance of a backport of 
>>> https://issues.apache.org/jira/browse/FLINK-6208 
>>> <https://issues.apache.org/jira/browse/FLINK-6208> to the 1.2 branch? I 
>>> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
>>> to use Flink for.
>>> 
>>> Moiz
>>> 
>>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas 
>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>>> The 1.3 is scheduled for the beginning of June.
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>>>> <mailto:moiz.ji...@gmail.com>> wrote:
>>>> 
>>>> Thanks Dawid! 
>>>> Yes thats what i was expecting. I'll give it a try.
>>>> 
>>>> When do you expect 1.3.0 stable to be out?
>>>> 
>>>> Moiz
>>>> 
>>>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
>>>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>>> Hi,
>>>> 
>>>> This is an expected behaviour. After the "ar" event there still may occur 
>>>> other "ar" event that will also trigger a match.
>>>> To be more generic in all versions prior to 1.3.0 there are two different 
>>>> consuming strategies:
>>>> STRICT (the next operator) - that accepts only if the event occurs 
>>>> directly after the previous 
>>>> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
>>>> following event if there were already an event that matched this pattern
>>>> Because after "ni" event we could match with some other "ar" events, the 
>>>> match is timeouted after 5 seconds.
>>>> 
>>>> In FLINK-6208 <https://issues.apache.org/jira/browse/FLINK-6208> we 
>>>> introduced third consuming strategy:
>>>> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
>>>> does not have to occur directly after the previous one but only one event 
>>>> can be matched
>>>> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
>>>> SKIP TILL NEXT strategy is the one you expected. 
>>>> You can check it on master branch. We did introduce lots of new features 
>>>> and bug

Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
Hi Moiz,

Here are the instructions on how to build Flink from source:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html>

Kostas

> On Apr 29, 2017, at 7:15 PM, Moiz S Jinia <moiz.ji...@gmail.com> wrote:
> 
> I meant maven dependencies that i can use by generating them from sources.
> 
> On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia <moiz.ji...@gmail.com 
> <mailto:moiz.ji...@gmail.com>> wrote:
> Ok I'll try that. Its just that I'd rather use a stable version.
> Are there any instructions for building binaries from latest sources?
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Moiz,
> 
> The skip-till-next is a big change and backporting it does not seem feasible. 
> Also this would require more general changes to the 1.2 to make it compatible 
> with the previous 1.2 versions.
> 
> If you want you can already use the 1.3 version by downloading the master 
> branch and writing your 
> use-case against that. The changes until the final release are going to be 
> minor hopefully and we can
> always help you adjust your program accordingly.
> 
> Hope this helps,
> Kostas
> 
>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> 
>> Oh ok thats a bit far off. Is there any chance of a backport of 
>> https://issues.apache.org/jira/browse/FLINK-6208 
>> <https://issues.apache.org/jira/browse/FLINK-6208> to the 1.2 branch? I 
>> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
>> to use Flink for.
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> The 1.3 is scheduled for the beginning of June.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>>> <mailto:moiz.ji...@gmail.com>> wrote:
>>> 
>>> Thanks Dawid! 
>>> Yes thats what i was expecting. I'll give it a try.
>>> 
>>> When do you expect 1.3.0 stable to be out?
>>> 
>>> Moiz
>>> 
>>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
>>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>> Hi,
>>> 
>>> This is an expected behaviour. After the "ar" event there still may occur 
>>> other "ar" event that will also trigger a match.
>>> To be more generic in all versions prior to 1.3.0 there are two different 
>>> consuming strategies:
>>> STRICT (the next operator) - that accepts only if the event occurs directly 
>>> after the previous 
>>> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
>>> following event if there were already an event that matched this pattern
>>> Because after "ni" event we could match with some other "ar" events, the 
>>> match is timeouted after 5 seconds.
>>> 
>>> In FLINK-6208 <https://issues.apache.org/jira/browse/FLINK-6208> we 
>>> introduced third consuming strategy:
>>> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
>>> does not have to occur directly after the previous one but only one event 
>>> can be matched
>>> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
>>> SKIP TILL NEXT strategy is the one you expected. 
>>> You can check it on master branch. We did introduce lots of new features 
>>> and bugfixes to CEP for 1.3.0 version so any comments,
>>> tests or suggestions are welcome.
>>> 
>>> 
>>> Z pozdrowieniami! / Cheers!
>>> 
>>> Dawid Wysakowicz
>>> Data/Software Engineer
>>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>>  <http://getindata.com/>
>>> 
>>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia <moiz.ji...@gmail.com 
>>> <mailto:moiz.ji...@gmail.com>>:
>>> When using "next", this pattern works fine for the both a match as well as 
>>> a timeout:
>>> 
>>> Pattern<Event, Event> pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .next("last").where(evt -> 
>>> evt.value.equals("ar")).within(Time.seconds(5));
>>> 
>>> 1. "ni" then "ar" within 5 seconds - triggers match
>>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>>> 
>>> But with "followedBy", this does not behave as expected:
>>> 
>>> Pattern<Event, Event> pattern = Pattern.begin("start")
>>> .where(evt -> evt.value.equals("ni"))
>>> .followedBy("last").where(evt -> 
>>> evt.value.equals("ar")).within(Time.seconds(5));
>>> 
>>> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>>> 
>>> Why is the timeout triggered when using followedBy (when there is a match)?
>>> 
>>> Version - 1.1.5.
>>> 
>>> 
>> 
>> 
> 
> 
> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
Hi Moiz,

The skip-till-next is a big change and backporting it does not seem feasible. 
Also this would require more general changes to the 1.2 to make it compatible 
with the previous 1.2 versions.

If you want you can already use the 1.3 version by downloading the master 
branch and writing your 
use-case against that. The changes until the final release are going to be 
minor hopefully and we can
always help you adjust your program accordingly.

Hope this helps,
Kostas

> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia <moiz.ji...@gmail.com> wrote:
> 
> Oh ok thats a bit far off. Is there any chance of a backport of 
> https://issues.apache.org/jira/browse/FLINK-6208 
> <https://issues.apache.org/jira/browse/FLINK-6208> to the 1.2 branch? I 
> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
> to use Flink for.
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> The 1.3 is scheduled for the beginning of June.
> 
> Cheers,
> Kostas
> 
>> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>> wrote:
>> 
>> Thanks Dawid! 
>> Yes thats what i was expecting. I'll give it a try.
>> 
>> When do you expect 1.3.0 stable to be out?
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>> Hi,
>> 
>> This is an expected behaviour. After the "ar" event there still may occur 
>> other "ar" event that will also trigger a match.
>> To be more generic in all versions prior to 1.3.0 there are two different 
>> consuming strategies:
>> STRICT (the next operator) - that accepts only if the event occurs directly 
>> after the previous 
>> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
>> following event if there were already an event that matched this pattern
>> Because after "ni" event we could match with some other "ar" events, the 
>> match is timeouted after 5 seconds.
>> 
>> In FLINK-6208 <https://issues.apache.org/jira/browse/FLINK-6208> we 
>> introduced third consuming strategy:
>> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
>> does not have to occur directly after the previous one but only one event 
>> can be matched
>> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
>> SKIP TILL NEXT strategy is the one you expected. 
>> You can check it on master branch. We did introduce lots of new features and 
>> bugfixes to CEP for 1.3.0 version so any comments,
>> tests or suggestions are welcome.
>> 
>> 
>> Z pozdrowieniami! / Cheers!
>> 
>> Dawid Wysakowicz
>> Data/Software Engineer
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>  <http://getindata.com/>
>> 
>> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia <moiz.ji...@gmail.com 
>> <mailto:moiz.ji...@gmail.com>>:
>> When using "next", this pattern works fine for the both a match as well as a 
>> timeout:
>> 
>> Pattern<Event, Event> pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .next("last").where(evt -> 
>> evt.value.equals("ar")).within(Time.seconds(5));
>> 
>> 1. "ni" then "ar" within 5 seconds - triggers match
>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>> 
>> But with "followedBy", this does not behave as expected:
>> 
>> Pattern<Event, Event> pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .followedBy("last").where(evt -> 
>> evt.value.equals("ar")).within(Time.seconds(5));
>> 
>> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
>> 
>> Why is the timeout triggered when using followedBy (when there is a match)?
>> 
>> Version - 1.1.5.
>> 
>> 
> 
> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-04-29 Thread Kostas Kloudas
The 1.3 is scheduled for the beginning of June.

Cheers,
Kostas

> On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:
> 
> Thanks Dawid! 
> Yes thats what i was expecting. I'll give it a try.
> 
> When do you expect 1.3.0 stable to be out?
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz  > wrote:
> Hi,
> 
> This is an expected behaviour. After the "ar" event there still may occur 
> other "ar" event that will also trigger a match.
> To be more generic in all versions prior to 1.3.0 there are two different 
> consuming strategies:
> STRICT (the next operator) - that accepts only if the event occurs directly 
> after the previous 
> SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
> following event if there were already an event that matched this pattern
> Because after "ni" event we could match with some other "ar" events, the 
> match is timeouted after 5 seconds.
> 
> In FLINK-6208  we 
> introduced third consuming strategy:
> SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
> does not have to occur directly after the previous one but only one event can 
> be matched
> and you can still use SKIP TILL ANY by using followedByAny. I believe the 
> SKIP TILL NEXT strategy is the one you expected. 
> You can check it on master branch. We did introduce lots of new features and 
> bugfixes to CEP for 1.3.0 version so any comments,
> tests or suggestions are welcome.
> 
> 
> Z pozdrowieniami! / Cheers!
> 
> Dawid Wysakowicz
> Data/Software Engineer
> Skype: dawid_wys | Twitter: @OneMoreCoder
>  
> 
> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia  >:
> When using "next", this pattern works fine for the both a match as well as a 
> timeout:
> 
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .next("last").where(evt -> 
> evt.value.equals("ar")).within(Time.seconds(5));
> 
> 1. "ni" then "ar" within 5 seconds - triggers match
> 2. "ni" then no "ar" within 5 seconds - triggers timeout
> 
> But with "followedBy", this does not behave as expected:
> 
> Pattern pattern = Pattern.begin("start")
> .where(evt -> evt.value.equals("ni"))
> .followedBy("last").where(evt -> 
> evt.value.equals("ar")).within(Time.seconds(5));
> 
> "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
> 
> Why is the timeout triggered when using followedBy (when there is a match)?
> 
> Version - 1.1.5.
> 
> 



Re: Multiple CEP Patterns

2017-04-28 Thread Kostas Kloudas
Perfect! And let us know how it goes!

Kostas

> On Apr 28, 2017, at 5:04 PM, mclendenin  wrote:
> 
> Ok, I will try using Flink 1.3
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871p12896.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Multiple CEP Patterns

2017-04-28 Thread Kostas Kloudas
Yes this is the master branch. 
We have not yet forked the 1.3 branch.

And I do not think there is a better way and I 
am not sure if there can be. Apart from the 
memory leak that is described in the JIRA, 
the different NFA’s cannot share any state, 
so for each one the associated memory overhead
is inevitable I think. We could potentially further 
reduce this overhead, but we cannot eliminate it.


Thanks,
Kostas

> On Apr 28, 2017, at 3:47 PM, mclendenin  wrote:
> 
> I do have a within clause on all the patterns and I am doing CEP.pattern on
> each one. On the output I am adding a Kafka sink. Since all the patterns are
> going to the same sink I was wondering if there was a better way to do it
> rather then having that overhead.
> 
> For the memory issues with 1.2, I do not see a branch for 1.3 in the source
> (https://github.com/apache/flink) Is that just the current master branch? 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871p12893.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Iterating over keys in state backend

2017-04-28 Thread Kostas Kloudas
Hi Ken,

So you have a queue where elements are sorted by timestamp and score, and when 
the time (event time I suppose) passes 
that of the timestamp of an element, you want to fetch the element and:
 if the score is too low you archive it 
 if the score is OK you emit it.

If I get it right, then if your stream is keyed you have a queue and an 
“archive” state per key, 
if not, you have a global queue for all elements, which can be seen as a keyed 
stream on a dummy key, right?
By the way, timers in Flink have to be associated with a key, so I suppose that 
if you are using timers you are in the first case (keyed stream).

In this case, why do you need access to the state of all the keys?

Also it may be worth having a look at the CEP operator in the Flink codebase.
There you also have a queue per key, where events are sorted by timestamp, and 
at each watermark, 
elements with timestamps smaller than the watermark are processed.

Hope this helps,
Kostas

> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Kostas,
> 
> Thanks for responding. Details in-line below.
> 
>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Unfortunately, iterating over all keys is not currently supported.
>> 
>> Do you have your own custom operator (because you mention “from within the 
>> operator…”) or
>> you have a process function (because you mention the “onTimer” method)?
> 
> Currently it’s a process function, but I might be able to just use a regular 
> operator.
> 
>> Also, could you describe your use case a bit more?  You have a periodic 
>> timer per key and when
>> a timer for a given key fires you want to have access to the state of all 
>> the keys?
> 
> The timer bit is because I’m filling an async queue, and thus need to trigger 
> emitting tuples to the operator’s output stream independent of inbound tuples.
> The main problems I’m trying to solve (without requiring a separate scalable 
> DB infrastructure) are:
> 
>  - entries have an associated “earliest processing time”. I don’t want to 
> send these through the system until that time trigger has passed.
>  - entries have an associated “score”. I want to favor processing high 
> scoring entries over low scoring entries.
>  - if an entry’s score is too low, I want to archive it, versus constantly 
> re-evaluate it using the above two factors.
> 
> I’ve got my own custom DB that is working for the above, and scales to target 
> sizes of 1B+ entries per server by using a mixture of RAM and disk.

> But having to checkpoint it isn’t trivial.
> 
> So I thought that if there was a way to (occasionally) iterate over the keys 
> in the state backend, I could get what I needed with the minimum effort.
> 
> But sounds like that’s not possible currently.
> 
> Thanks,
> 
> — Ken
> 
> 
> 
>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Is there a way to iterate over all of the key/value entries in the state 
>>> backend, from within the operator that’s making use of the same?
>>> 
>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the 
>>> onTimer method) I need to iterate over all KV state and emit the N “best” 
>>> entries.
>>> 
>>> What’s the recommended approach?
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>> 
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr



Re: CEP join across events

2017-04-28 Thread Kostas Kloudas
Hi Elias,

I think this is a really interesting suggestion for the case where you do not 
have an “accumulating” 
value. Because imagine that you want to accept the “next” element, if the sum 
of all the previous 
is less than Y. To have a similar syntax with an accumulator, we should add 
more methods with 
additional arguments, right? 

For a first release, we opted for the simplest solution so that we can gather 
more information on 
how people intend to use the new features. Despite that, I really think that it 
is an interesting and 
more intuitive syntax so could you open a JIRA so that we move the discussion 
there, or if you 
want I can open it for you.

Thanks a lot for the suggestion,
Kostas

> On Apr 28, 2017, at 1:09 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> It would be useful if there were a cleaner syntax for specifying 
> relationships between matched events, as in an SQL join, particularly for 
> conditions with a quantifier of one.
> 
> At the moment you have to do something like
> 
> Pattern.
>   .begin[Foo]("first")
> .where( first => first.baz == 1 )
>   .followedBy("next")
> .where({ (next, ctx) =>
>   val first = ctx.getEventsForPattern("first").next
>   first.bar == next.bar && next => next.boo = "x"
> })
> 
> which is not very clean.  It would friendlier if you could do something like:
> 
> Pattern.
>   .begin[Foo]("first")
> .where( first => first.baz == 1 )
>   .followedBy("next")
>     .relatedTo("first", { (first, next) => first.bar == next.bar })
> .where( next => next.boo = "x" )
> 
> 
> 
> On Thu, Apr 27, 2017 at 1:21 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Glad that this is not a blocker for you and 
> you are right that we should clarify it better in the documentation.
> 



Re: Multiple CEP Patterns

2017-04-28 Thread Kostas Kloudas
Sorry for the quick followup, but another question, in case the JIRA I sent you 
is not what affects your job, do your patterns have a timeout (the within() 
clause) ?

If not, then also other parts of the system (e.g. the internal state of your 
NFA)
may grow indefinitely.

Kostas

> On Apr 28, 2017, at 9:44 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi!
> 
> I suppose that by memory errors you mean you run out of memory, right?
> 
> Are you using Flink 1.2 or the current master (upcoming Flink 1.3).
> The reason I am asking is because Flink 1.2 suffered from this
> https://issues.apache.org/jira/browse/FLINK-5174 
> <https://issues.apache.org/jira/browse/FLINK-5174>
> which is now fixed in Flink 1.3, and you are more than welcome to 
> try it out, to also help us with testing the new features.
> 
> Now if this is not the case, could you share a bit more details 
> about your program?
> 
> You do a CEP.pattern(input, pattern_x) for each of your patterns? 
> (input is your input stream and x the index of each pattern)
> 
> Thanks,
> Kostas
>  
>> On Apr 27, 2017, at 8:59 PM, mclendenin <marcusc...@gmail.com 
>> <mailto:marcusc...@gmail.com>> wrote:
>> 
>> I'm trying to run multiple independent CEP patterns. They're basic patterns,
>> just one input followed by another and my flink job runs fine when just
>> using 1 pattern. If i try to scale this up to add multiple CEP patterns, 200
>> for example, I start getting memory errors on my cluster. I can definitely
>> add more memory, but I want to know if there is an accepted way to run
>> multiple patterns.
>> 
>> Currently I am just doing a flatselect on the output of CEP.pattern and then
>> making a list of all these inputs and putting a sink on all of the streams.
>> What this is doing is creating a different stream for each pattern, which is
>> causing the output of the Logical Plan in the UI to be too big to even see.
>> Does anybody know of any better way to do this?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871.html
>>  
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com <http://nabble.com/>.
> 



Re: Multiple CEP Patterns

2017-04-28 Thread Kostas Kloudas
Hi!

I suppose that by memory errors you mean you run out of memory, right?

Are you using Flink 1.2 or the current master (upcoming Flink 1.3).
The reason I am asking is because Flink 1.2 suffered from this
https://issues.apache.org/jira/browse/FLINK-5174 

which is now fixed in Flink 1.3, and you are more than welcome to 
try it out, to also help us with testing the new features.

Now if this is not the case, could you share a bit more details 
about your program?

You do a CEP.pattern(input, pattern_x) for each of your patterns? 
(input is your input stream and x the index of each pattern)

Thanks,
Kostas
 
> On Apr 27, 2017, at 8:59 PM, mclendenin  wrote:
> 
> I'm trying to run multiple independent CEP patterns. They're basic patterns,
> just one input followed by another and my flink job runs fine when just
> using 1 pattern. If i try to scale this up to add multiple CEP patterns, 200
> for example, I start getting memory errors on my cluster. I can definitely
> add more memory, but I want to know if there is an accepted way to run
> multiple patterns.
> 
> Currently I am just doing a flatselect on the output of CEP.pattern and then
> making a list of all these inputs and putting a sink on all of the streams.
> What this is doing is creating a different stream for each pattern, which is
> causing the output of the Logical Plan in the UI to be too big to even see.
> Does anybody know of any better way to do this?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: CEP join across events

2017-04-27 Thread Kostas Kloudas
Hi Elias,

Glad that this is not a blocker for you and 
you are right that we should clarify it better in the documentation.

Thanks,
Kostas

> On Apr 27, 2017, at 3:28 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> You are correct.  Apologies for the confusion.  Given that 
> ctx.getEventsForPattern returns an iterator instead of a value and that the 
> example in the documentation deals with summing multiple matches, I got the 
> impression that the call would return all previous matches instead of one at 
> a time for each branch. 
> 
> I suppose it returns an iterator to support patterns where the event has some 
> associated enumerator, like times(), zeroOrMore(), or oneOrMore(), yes?
> 
> Might be helpful to clarify this and point out that the iterator will contain 
> a single value for the common case of match with a enumerator of one, which 
> is the default.
> 
> 
> On Wed, Apr 26, 2017 at 2:15 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Elias,
> 
> If I understand correctly your use case, you want for an input:
> 
> event_1 = (type=1, value_a=K, value_b=X)
> event_2 = (type=2, value_a=K, value_b=X)
> event_3 = (type=1, value_a=K, value_b=Y)
> 
> to get a match:
> 
> event_1, event_2
> 
> and discard event_3, right?
> 
> In this case, Dawid is correct and from a first look at your code, it should 
> work.
> If not, what is the output that you get?
> 
> Kostas
> 
> 
>> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com 
>> <mailto:wysakowicz.da...@gmail.com>> wrote:
>> 
>> Hi Elias,
>> 
>> You can do it with 1.3 and IterativeConditions. Method 
>> ctx.getEventsForPattern("foo") returns only those events that were matched 
>> in "foo" pattern in that particular branch.
>> I mean that for a sequence like (type =1, value_b = X); (type=1, value_b=Y); 
>> (type=2, value_b=X) both events of type = 1 create a seperate pattern branch 
>> and the event with type = 2 will be checked for a match twice for both of 
>> those branches.
>> 
>> Regards,
>> Dawid
>> 
>> 2017-04-26 7:48 GMT+02:00 Elias Levy <fearsome.lucid...@gmail.com 
>> <mailto:fearsome.lucid...@gmail.com>>:
>> There doesn't appear to be a way to join events across conditions using the 
>> CEP library.
>> 
>> Consider events of the form (type, value_a, value_b) on a stream keyed by 
>> the value_a field.  
>> 
>> Under 1.2 you can create a pattern that for a given value_a, as specified by 
>> the stream key, there is a match if an event of type 1 is followed by an 
>> event of type 2 (e.g. 
>> begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this 
>> will return a match regardless of whether value_b in the first event matches 
>> value_b in the second event.
>> 
>> 1.3 snapshot introduces iterative conditions, but this is insufficient.  In 
>> 1.3 you can do:
>> 
>> begin("foo").where(_.type==1).followedBy("bar").where(
>> (v, ctx) => {
>>v.type == 2 &&
>>ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b == 
>> v.value_b)
>> })
>> 
>> This will accept the current event if any if any previously had a value_b 
>> that matches the current event. But the matches will include all previous 
>> events, even those that did not match the current event at value_b, instead 
>> of only matching the previous event where value_b equals the current event.
>> 
>> Is there a way to only output the match there previous event matches the 
>> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar == 
>> (type=2, value_a=K, value_b=X)?
>> 
>> 
>> 
> 
> 



Re: Iterating over keys in state backend

2017-04-27 Thread Kostas Kloudas
Hi Ken,

Unfortunately, iterating over all keys is not currently supported.

Do you have your own custom operator (because you mention “from within the 
operator…”) or
you have a process function (because you mention the “onTimer” method)?

Also, could you describe your use case a bit more?  You have a periodic timer 
per key and when
a timer for a given key fires you want to have access to the state of all the 
keys?

Thanks,
Kostas

> On Apr 27, 2017, at 3:02 AM, Ken Krugler  wrote:
> 
> Is there a way to iterate over all of the key/value entries in the state 
> backend, from within the operator that’s making use of the same?
> 
> E.g. I’ve got a ReducingState, and on a timed interval (inside of the onTimer 
> method) I need to iterate over all KV state and emit the N “best” entries.
> 
> What’s the recommended approach?
> 
> Thanks,
> 
> — Ken
> 



Re: CEP join across events

2017-04-26 Thread Kostas Kloudas
Hi Elias,

If I understand correctly your use case, you want for an input:

event_1 = (type=1, value_a=K, value_b=X)
event_2 = (type=2, value_a=K, value_b=X)
event_3 = (type=1, value_a=K, value_b=Y)

to get a match:

event_1, event_2

and discard event_3, right?

In this case, Dawid is correct and from a first look at your code, it should 
work.
If not, what is the output that you get?

Kostas


> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz  
> wrote:
> 
> Hi Elias,
> 
> You can do it with 1.3 and IterativeConditions. Method 
> ctx.getEventsForPattern("foo") returns only those events that were matched in 
> "foo" pattern in that particular branch.
> I mean that for a sequence like (type =1, value_b = X); (type=1, value_b=Y); 
> (type=2, value_b=X) both events of type = 1 create a seperate pattern branch 
> and the event with type = 2 will be checked for a match twice for both of 
> those branches.
> 
> Regards,
> Dawid
> 
> 2017-04-26 7:48 GMT+02:00 Elias Levy  >:
> There doesn't appear to be a way to join events across conditions using the 
> CEP library.
> 
> Consider events of the form (type, value_a, value_b) on a stream keyed by the 
> value_a field.  
> 
> Under 1.2 you can create a pattern that for a given value_a, as specified by 
> the stream key, there is a match if an event of type 1 is followed by an 
> event of type 2 (e.g. 
> begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this 
> will return a match regardless of whether value_b in the first event matches 
> value_b in the second event.
> 
> 1.3 snapshot introduces iterative conditions, but this is insufficient.  In 
> 1.3 you can do:
> 
> begin("foo").where(_.type==1).followedBy("bar").where(
> (v, ctx) => {
>v.type == 2 &&
>ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b == 
> v.value_b)
> })
> 
> This will accept the current event if any if any previously had a value_b 
> that matches the current event. But the matches will include all previous 
> events, even those that did not match the current event at value_b, instead 
> of only matching the previous event where value_b equals the current event.
> 
> Is there a way to only output the match there previous event matches the 
> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar == 
> (type=2, value_a=K, value_b=X)?
> 
> 
> 



Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Kostas Kloudas
Perfect! 

Thanks a lot for testing it Luis!
And keep us posted if you find anything else.
As you may have seen the CEP library is undergoing heavy refactoring for the 
upcoming release.

Kostas

> On Apr 25, 2017, at 12:30 PM, Luis Lázaro  wrote:
> 
> Hi Aljoscha and Kostas, thanks in advance.
> 
> Kostas, i followed your recommendation and it seems to be working fine.
> 
> I did:
> - upgrade to 1.3.-SNAPSHOT from master branch.
> - try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
> alerts are correct (do not process late events as normal ones) and i get a 
> lot of warning about violated ascending monotony (its ok, my events are not 
> ordered in time).
> - try assign timestamp and emit watermarks using 
> BoundedOutOfOrdernessTimestampExtractor: alerts are correct.
> 
> 
> Thanks a lot, 
> best regards, Luis.
> 
> 
> 



Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-21 Thread Kostas Kloudas
Hi Luis and Aljoscha,

In Flink-1.2 late events were not dropped, but they were processed as normal 
ones.
This is fixed for Flink-1.3 with 
https://issues.apache.org/jira/browse/FLINK-6205 
.

I would recommend you to switch to the master branch (this will be the upcoming 
Flink-1.3
release) and try it out to see if everything works as expected.

The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and 
by 
trying it out you will also help us stabilize it even further before its 
official release.

Thanks a lot,
Kostas

> On Apr 19, 2017, at 3:28 PM, Luis Lázaro  wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> 
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>   parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>   parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>   override def extractAscendingTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>   .begin[LogEntry]("First Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .next("Second Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>   .keyBy(_.service),
>   myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> 
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>   override def extractTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> 

Re: Window Functions and Empty Panes

2017-04-18 Thread Kostas Kloudas
I forgot to say that timers are fault-tolerant. You set them, and Flink takes 
care of checkpointing and
restoring them after failure. The flag will also be fault-tolerant as, i 
suppose, you will use Flink’s keyed state.

For more info, you can check the ProcessFunction documentation that Konstantin 
provided.
There, the example uses a value state to hold the counter, you can do sth 
similar to keep the flag.
Keep in mind that the state will already be scoped by key so you do not have to 
worry about that
either.

Kostas

> On Apr 18, 2017, at 11:11 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> No problem! Glad I could help!
> 
> Kostas
> 
>> On Apr 18, 2017, at 11:01 PM, Ryan Conway <ryanmackenziecon...@gmail.com 
>> <mailto:ryanmackenziecon...@gmail.com>> wrote:
>> 
>> Hi Kostas,
>> 
>> Re restarting: I missed that ProcessFunction.OnTimerContext extends 
>> ProcessFunction.Context! Until now my thought was that OnTimerContext did 
>> not provide a means of restarting a timer.
>> 
>> Re initial timer, you're right, I'll just need to track a boolean in a state 
>> variable that notes whether or not the timer has been initialized. What I am 
>> not confident about is how to manage timer recovery after a node failure; I 
>> imagine it will make sense to not track this variable. I will do more 
>> research and cross that bridge when I get there.
>> 
>> So I think a process function will work just fine, here. Thank you again for 
>> your time, Kostas and Konstantin.
>> 
>> Ryan
>> 
>> On Tue, Apr 18, 2017 at 12:07 PM, Kostas Kloudas 
>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Ryan,
>> 
>> “A periodic window like this requires the ability to start a timer without 
>> an element and to restart a timer when fired.”
>> 
>> For the second part, i.e. “to restart a timer when fired”, you can 
>> re-register the timer in the onTimer() method (set a 
>> new timer for “now + T"), so that the next one fires after T time units, 
>> where T is your period.
>> 
>> For the first part, where you set the initial timer for a window, this needs 
>> to have a first element right? If not, how
>> do you know the key for which to set the timer? Are all the keys known in 
>> advance?
>> 
>> Kostas
>> 
>> 
>> 
>>> On Apr 18, 2017, at 8:35 PM, Ryan Conway <ryanmackenziecon...@gmail.com 
>>> <mailto:ryanmackenziecon...@gmail.com>> wrote:
>>> 
>>> A periodic window like this requires the ability to start a timer without 
>>> an element and to restart a timer when fired.
>> 
>> 
> 



<    1   2   3   4   >