Why not opening a JIRA and working on adding some debug 
statements that you consider useful?

This could help the next user that faces the same issues ;)

Kostas

> On Mar 7, 2018, at 3:29 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> Aah, yes we never had a sink state so never came across a case where it was 
> ever exercised.  When the range expires, it is a prune rather than a stop 
> state ( we were expecting it to be a stop state )  which is some what 
> misleading if we hold stop state to " that invalidates a partial match " 
> whatever the reason may be.
> 
> Again I would also advise ( though not a biggy )  that strategic debug 
> statements in the CEP core would help folks to see what actually happens. We 
> instrumented the code to follow the construction of NFA that was very 
> helpful. 
> 
> On Wed, Mar 7, 2018 at 9:23 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Vishal,
> 
> A stopState is a state that invalidates a partial match, e.g. 
> a.NotFollowedBy(b).followedBy(c). 
> If you have an “a” and then you see a “b” then you invalidate the pattern.
> 
> A finalState is the one where a match has been found.
> 
> Kostas
> 
> 
>> On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> Absolutely.  For one a simple m out of n true conditions where n is defined 
>> by range is a little under optimized as in just using time(m) will not short 
>> circuit the partial patterns till the time range is achieved even if there 
>> is no way m true conditions can be achieved ( we already have had n-m false 
>> conditions ) . That makes sense as we have defined a within() condition 
>> predicated on n. 
>> 
>> I think the way one would do it is to iterative condition and look at all  
>> events  ( including the ones with false but that can be expensive ) and stop 
>> a pattern. One question I had is that an NFA can be in a FinalState or a 
>> StopState. 
>> 
>> What would constitute a StopState ? 
>> 
>> On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Vishal,
>> 
>> Thanks a lot for sharing your experience and the potential caveats to 
>> consider when 
>> specifying your pattern.
>> 
>> I agree that there is room for improvement when it comes to the state 
>> checkpointed in Flink.
>> We already have some ideas but still, as you also said, the bulk of the 
>> space consumption
>> comes from the pattern definition, so it could be nice if more people did 
>> the same, i.e. sharing 
>> their experience, and why not, compiling a guide of things to avoid and put 
>> it along the rest
>> of FlinkCEP documentation.
>> 
>> What do you think?
>> 
>> Kostas
>> 
>> 
>> 
>>> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> 
>>> Hello all,  There were recent changes to the flink master that I pulled in 
>>> and that seems to have solved our issue.  
>>> 
>>> Few points 
>>> 
>>> * CEP is heavy as the NFA  transition  matrix   as state which can be  
>>> possibly  n^2 ( worst case )  can easily blow up space requirements.  The 
>>> after match skip strategy is likely to play a crucial role in keeping the 
>>> state lean 
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy>.
>>>   In our case we do not require partial matches within a match to 
>>> contribute to another potential match ( noise for us )  and thus 
>>> SKIP_PAST_LAST_EVENT was used which on match will prune the SharedBuffer ( 
>>> almost reset it ) 
>>> 
>>> * The argument that the pattern events should be lean holds much more in 
>>> CEP due to the potential exponential increase in space requirements. 
>>> 
>>> * The nature of the pattern will require consideration if state does blow 
>>> up for you.
>>> 
>>> Apart from that, I am still not sure why toString() on SharedBuffer was 
>>> called to get an OOM to begin with.
>>> 
>>> 
>>> 
>>> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> We could not recreate in a controlled setup, but here are a few notes that 
>>> we have gathered on a simple  "times(n),within(..)"
>>> 
>>> In case where the Event does not create a Final or Stop state
>>> 
>>> * As an NFA processes an Event, NFA mutates if there is a true Event. Each 
>>> computation is a counter that keeps track of partial matches with each true 
>>> Event already existent partial match for that computation unit. Essentially 
>>> for n Events and if each Event is a true there will be roughly n-1 
>>> computations, each with representing an Event from 1 to n-1 ( so 1 or first 
>>> will have n-1 events in the partial match, 2 has n-1 events and so on and 
>>> n-1 has the last event as a partial match ).
>>> 
>>> * If the WM progresses  beyond the ts of the 1st computation, that partial 
>>> match is pruned.
>>> 
>>> * It makes sure that a SharedBufferEntry is pruned only if the count of 
>>> Edges originating from it reduces to 0 ( the internalRemove() which uses a 
>>> Stack) , which should happen as WM keeps progressing to the nth element for 
>>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a 
>>> WM progression 
>>> 
>>> 
>>> In case there is a FinalState  ( and we skipToFirstAfterLast ) 
>>> 
>>> * The NFA by will prune ( release )  all partial matches and prune the 
>>> shared buffer and emit the current match. The computations now should be 
>>> empty.
>>> 
>>> There is a lot to it, but is that roughly what is done in that code ?
>>> 
>>> 
>>> 
>>> Few questions. 
>>> 
>>> * What we have seen is that the call to toString method of SharedBuffer is 
>>> where OOM occurs. Now in the code there is no call to a Log so we are not 
>>> sure why the method or who calls that method. Surely that is not part of 
>>> the Seriazation/DeSer routine or is it ( very surprising if it is ) 
>>> * There is no out of the box implementation of "m out of n"  pattern match. 
>>> We have to resort to n in range ( m * time series slot ) which we do. This 
>>> is fine but what it does not allow is an optimization where if n false 
>>> conditions are seen, one can prune.  Simply speaking if n-m  false have 
>>> been seen there is no way  that out of n there will be ever m trues and 
>>> thus SharedBuffer can be pruned to the last true seen ( very akin to 
>>> skipToFirstAfterLast ).  
>>> 
>>> We will keep instrumenting the code ( which apart from the null message is 
>>> easily understandable ) but would love to hear your feedback. 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>>  
>>> 
>>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas 
>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
>>> Thanks a lot Vishal! 
>>> 
>>> We are looking forward to a test case that reproduces the failure.
>>> 
>>> Kostas
>>> 
>>> 
>>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>>> 
>>>> This is the pattern. Will create a test case. 
>>>> /**
>>>>  *
>>>>  * @param condition a single condition is applied as a  acceptance criteria
>>>>  * @param params defining the bounds of the pattern.
>>>>  * @param <U> the element in the stream
>>>>  * @return compiled pattern alonf with the params.
>>>>  */
>>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
>>>> of(SimpleCondition<U> condition,
>>>>                                                                           
>>>> RelaxedContiguityWithinTime params,
>>>>                                                                           
>>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>>>>                                                                           
>>>> String patternId) {
>>>>     assert (params.seriesLength >= params.elementCount && 
>>>> params.elementCount > 0);
>>>>     Pattern<U, ?> pattern = Pattern.
>>>>             <U>begin(START).
>>>>             where(condition);
>>>>     if (params.elementCount > 1) pattern = pattern.
>>>>             followedBy(REST).
>>>>             where(condition).
>>>>             times(params.elementCount - 1);
>>>> 
>>>>     return new RelaxedContiguousPattern<U>(
>>>>             pattern.within(Time.minutes(params.seriesLength * 
>>>> params.period.duration))
>>>>             ,params, 
>>>>             params.elementCount > 1, 
>>>>             params.period.duration, 
>>>>             mapFunc, 
>>>>             patternId
>>>>     );
>>>> }
>>>> 
>>>> 
>>>> 
>>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz 
>>>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>>> Could you provide some example to reproduce the case? Or the Pattern that 
>>>> you are using? It would help track down the issue.
>>>> 
>>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com 
>>>> > <mailto:vishal.santo...@gmail.com>> wrote:
>>>> >
>>>> > I have pulled in the flink master cep library and the runtime ( the 
>>>> > cluster ) is configured to work against the latest and greatest. This 
>>>> > does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) 
>>>> > but is always an issue when it is a larger range ( 20 out of 25 with 
>>>> > range of 8 hours ) . Does that makes sense?
>>>> >
>>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz 
>>>> > <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>>> > This problem sounds very similar to this one that was fixed for 1.4.1 
>>>> > and 1.5.0:
>>>> > https://issues.apache.org/jira/browse/FLINK-8226 
>>>> > <https://issues.apache.org/jira/browse/FLINK-8226>
>>>> >
>>>> > Could you check if that helps with your problem too?
>>>> >
>>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com 
>>>> > > <mailto:vishal.santo...@gmail.com>> wrote:
>>>> > >
>>>> > > I have flink master CEP library code imported to  a 1.4 build.
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi 
>>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> > > A new one
>>>> > >
>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>> > >       at java.util.Arrays.copyOf(
>>>> > > Arrays.java:3332)
>>>> > >       at java.lang.
>>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
>>>> > > 124)
>>>> > >       at java.lang.
>>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>>>> > > 448)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:136)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4106)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4151)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>> > > SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>>>> > > 673)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4097)
>>>> > >       at org.apache.commons.lang3.
>>>> > > StringUtils.join(StringUtils.
>>>> > > java:4151)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEntry.toString(
>>>> > > SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(
>>>> > > String.java:2994)
>>>> > >       at java.lang.StringBuilder.
>>>> > > append(StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.
>>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>>>> > > .
>>>> > > .
>>>> > > .
>>>> > > It is the toString() on
>>>> > > SharedBuffer
>>>> > > no doubt. Some recursive loop ?
>>>> > >
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi 
>>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> > > It happens when it looks to throw an exception and calls 
>>>> > > shardBuffer.toString. b'coz of the check....
>>>> > >
>>>> > >
>>>> > > int id = sharedBuffer.entryId;
>>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + 
>>>> > > sharedBuffer);
>>>> > >
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi 
>>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> > > The watermark has not moved for this pattern to succeed ( or other 
>>>> > > wise ), the issue though is that it is pretty early in the pipe ( like 
>>>> > > within a minute ).  I am replaying from a kafka topic but the keyed 
>>>> > > operator has emitted no more than 1500 plus elements to 
>>>> > > SelectCEPOperator ( very visible on the UI ) so am sure not enough 
>>>> > > elements have been added to the SharedBuffer to create memory stress.
>>>> > >
>>>> > > The nature of the input stream is that events are pushed out with a 
>>>> > > specific timestamp ( it is a time series and the timestamp if the 
>>>> > > beginning of the time slot )  as in one will have a bunch of elements 
>>>> > > that have a constant timestamp till the next batch appears.
>>>> > >
>>>> > > A batch though does not have more than the number of keys elements ( 
>>>> > > 600 ).
>>>> > >
>>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi 
>>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( 
>>>> > > across 600 keys at the max ) put in
>>>> > > and though I have a pretty wide range , as in I am looking at a 
>>>> > > relaxed pattern ( like 40 true conditions in 6 hours ),
>>>> > > I get this. I have the EventTime turned on.
>>>> > >
>>>> > >
>>>> > > java.lang.OutOfMemoryError: Java heap space
>>>> > >       at java.util.Arrays.copyOf(Arrays
>>>> > > .java:3332)
>>>> > >       at java.lang.AbstractStringBuilde
>>>> > > r.ensureCapacityInternal(Abstr
>>>> > > actStringBuilder.java:124)
>>>> > >       at java.lang.AbstractStringBuilde
>>>> > > r.append(AbstractStringBuilder
>>>> > > .java:448)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:136)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:131)
>>>> > >       at org.apache.commons.lang3.Strin
>>>> > > gUtils.join(StringUtils.java:4
>>>> > > 106)
>>>> > >       at org.apache.commons.lang3.Strin
>>>> > > gUtils.join(StringUtils.java:4
>>>> > > 151)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferEntry.toSt
>>>> > > ring(SharedBuffer.java:624)
>>>> > >       at java.lang.String.valueOf(Strin
>>>> > > g.java:2994)
>>>> > >       at java.lang.StringBuilder.append
>>>> > > (StringBuilder.java:131)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>>>> > > 64)
>>>> > >       at org.apache.flink.cep.nfa.Share
>>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>>>> > > 35)
>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>> > > FASerializer.serialize(NFA.jav
>>>> > > a:888)
>>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>>> > > FASerializer.serialize(NFA.jav
>>>> > > a:820)
>>>> > >       at org.apache.flink.contrib.strea
>>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>>>> > > .
>>>> > > .
>>>> > > .
>>>> > >
>>>> > > Any one has seen this issue ?
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to