Hi Vishal,

A stopState is a state that invalidates a partial match, e.g. 
a.NotFollowedBy(b).followedBy(c). 
If you have an “a” and then you see a “b” then you invalidate the pattern.

A finalState is the one where a match has been found.

Kostas

> On Mar 7, 2018, at 3:20 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> Absolutely.  For one a simple m out of n true conditions where n is defined 
> by range is a little under optimized as in just using time(m) will not short 
> circuit the partial patterns till the time range is achieved even if there is 
> no way m true conditions can be achieved ( we already have had n-m false 
> conditions ) . That makes sense as we have defined a within() condition 
> predicated on n. 
> 
> I think the way one would do it is to iterative condition and look at all  
> events  ( including the ones with false but that can be expensive ) and stop 
> a pattern. One question I had is that an NFA can be in a FinalState or a 
> StopState. 
> 
> What would constitute a StopState ? 
> 
> On Wed, Mar 7, 2018 at 8:47 AM, Kostas Kloudas <k.klou...@data-artisans.com 
> <mailto:k.klou...@data-artisans.com>> wrote:
> Hi Vishal,
> 
> Thanks a lot for sharing your experience and the potential caveats to 
> consider when 
> specifying your pattern.
> 
> I agree that there is room for improvement when it comes to the state 
> checkpointed in Flink.
> We already have some ideas but still, as you also said, the bulk of the space 
> consumption
> comes from the pattern definition, so it could be nice if more people did the 
> same, i.e. sharing 
> their experience, and why not, compiling a guide of things to avoid and put 
> it along the rest
> of FlinkCEP documentation.
> 
> What do you think?
> 
> Kostas
> 
> 
> 
>> On Mar 7, 2018, at 2:34 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> Hello all,  There were recent changes to the flink master that I pulled in 
>> and that seems to have solved our issue.  
>> 
>> Few points 
>> 
>> * CEP is heavy as the NFA  transition  matrix   as state which can be  
>> possibly  n^2 ( worst case )  can easily blow up space requirements.  The 
>> after match skip strategy is likely to play a crucial role in keeping the 
>> state lean 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html#after-match-skip-strategy>.
>>   In our case we do not require partial matches within a match to contribute 
>> to another potential match ( noise for us )  and thus SKIP_PAST_LAST_EVENT 
>> was used which on match will prune the SharedBuffer ( almost reset it ) 
>> 
>> * The argument that the pattern events should be lean holds much more in CEP 
>> due to the potential exponential increase in space requirements. 
>> 
>> * The nature of the pattern will require consideration if state does blow up 
>> for you.
>> 
>> Apart from that, I am still not sure why toString() on SharedBuffer was 
>> called to get an OOM to begin with.
>> 
>> 
>> 
>> On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> We could not recreate in a controlled setup, but here are a few notes that 
>> we have gathered on a simple  "times(n),within(..)"
>> 
>> In case where the Event does not create a Final or Stop state
>> 
>> * As an NFA processes an Event, NFA mutates if there is a true Event. Each 
>> computation is a counter that keeps track of partial matches with each true 
>> Event already existent partial match for that computation unit. Essentially 
>> for n Events and if each Event is a true there will be roughly n-1 
>> computations, each with representing an Event from 1 to n-1 ( so 1 or first 
>> will have n-1 events in the partial match, 2 has n-1 events and so on and 
>> n-1 has the last event as a partial match ).
>> 
>> * If the WM progresses  beyond the ts of the 1st computation, that partial 
>> match is pruned.
>> 
>> * It makes sure that a SharedBufferEntry is pruned only if the count of 
>> Edges originating from it reduces to 0 ( the internalRemove() which uses a 
>> Stack) , which should happen as WM keeps progressing to the nth element for 
>> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a 
>> WM progression 
>> 
>> 
>> In case there is a FinalState  ( and we skipToFirstAfterLast ) 
>> 
>> * The NFA by will prune ( release )  all partial matches and prune the 
>> shared buffer and emit the current match. The computations now should be 
>> empty.
>> 
>> There is a lot to it, but is that roughly what is done in that code ?
>> 
>> 
>> 
>> Few questions. 
>> 
>> * What we have seen is that the call to toString method of SharedBuffer is 
>> where OOM occurs. Now in the code there is no call to a Log so we are not 
>> sure why the method or who calls that method. Surely that is not part of the 
>> Seriazation/DeSer routine or is it ( very surprising if it is ) 
>> * There is no out of the box implementation of "m out of n"  pattern match. 
>> We have to resort to n in range ( m * time series slot ) which we do. This 
>> is fine but what it does not allow is an optimization where if n false 
>> conditions are seen, one can prune.  Simply speaking if n-m  false have been 
>> seen there is no way  that out of n there will be ever m trues and thus 
>> SharedBuffer can be pruned to the last true seen ( very akin to 
>> skipToFirstAfterLast ).  
>> 
>> We will keep instrumenting the code ( which apart from the null message is 
>> easily understandable ) but would love to hear your feedback. 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>  
>> 
>> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> Thanks a lot Vishal! 
>> 
>> We are looking forward to a test case that reproduces the failure.
>> 
>> Kostas
>> 
>> 
>>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> 
>>> This is the pattern. Will create a test case. 
>>> /**
>>>  *
>>>  * @param condition a single condition is applied as a  acceptance criteria
>>>  * @param params defining the bounds of the pattern.
>>>  * @param <U> the element in the stream
>>>  * @return compiled pattern alonf with the params.
>>>  */
>>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
>>> of(SimpleCondition<U> condition,
>>>                                                                           
>>> RelaxedContiguityWithinTime params,
>>>                                                                           
>>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>>>                                                                           
>>> String patternId) {
>>>     assert (params.seriesLength >= params.elementCount && 
>>> params.elementCount > 0);
>>>     Pattern<U, ?> pattern = Pattern.
>>>             <U>begin(START).
>>>             where(condition);
>>>     if (params.elementCount > 1) pattern = pattern.
>>>             followedBy(REST).
>>>             where(condition).
>>>             times(params.elementCount - 1);
>>> 
>>>     return new RelaxedContiguousPattern<U>(
>>>             pattern.within(Time.minutes(params.seriesLength * 
>>> params.period.duration))
>>>             ,params, 
>>>             params.elementCount > 1, 
>>>             params.period.duration, 
>>>             mapFunc, 
>>>             patternId
>>>     );
>>> }
>>> 
>>> 
>>> 
>>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz 
>>> <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>> Could you provide some example to reproduce the case? Or the Pattern that 
>>> you are using? It would help track down the issue.
>>> 
>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com 
>>> > <mailto:vishal.santo...@gmail.com>> wrote:
>>> >
>>> > I have pulled in the flink master cep library and the runtime ( the 
>>> > cluster ) is configured to work against the latest and greatest. This 
>>> > does not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) 
>>> > but is always an issue when it is a larger range ( 20 out of 25 with 
>>> > range of 8 hours ) . Does that makes sense?
>>> >
>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz 
>>> > <wysakowicz.da...@gmail.com <mailto:wysakowicz.da...@gmail.com>> wrote:
>>> > This problem sounds very similar to this one that was fixed for 1.4.1 and 
>>> > 1.5.0:
>>> > https://issues.apache.org/jira/browse/FLINK-8226 
>>> > <https://issues.apache.org/jira/browse/FLINK-8226>
>>> >
>>> > Could you check if that helps with your problem too?
>>> >
>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com 
>>> > > <mailto:vishal.santo...@gmail.com>> wrote:
>>> > >
>>> > > I have flink master CEP library code imported to  a 1.4 build.
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi 
>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> > > A new one
>>> > >
>>> > > java.lang.OutOfMemoryError: Java heap space
>>> > >       at java.util.Arrays.copyOf(
>>> > > Arrays.java:3332)
>>> > >       at java.lang.
>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:
>>> > > 124)
>>> > >       at java.lang.
>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>>> > > 448)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:136)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4106)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4151)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEntry.toString(
>>> > > SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>>> > > 673)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4097)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4151)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEntry.toString(
>>> > > SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>>> > > .
>>> > > .
>>> > > .
>>> > > It is the toString() on
>>> > > SharedBuffer
>>> > > no doubt. Some recursive loop ?
>>> > >
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi 
>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> > > It happens when it looks to throw an exception and calls 
>>> > > shardBuffer.toString. b'coz of the check....
>>> > >
>>> > >
>>> > > int id = sharedBuffer.entryId;
>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " + 
>>> > > sharedBuffer);
>>> > >
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi 
>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> > > The watermark has not moved for this pattern to succeed ( or other wise 
>>> > > ), the issue though is that it is pretty early in the pipe ( like 
>>> > > within a minute ).  I am replaying from a kafka topic but the keyed 
>>> > > operator has emitted no more than 1500 plus elements to 
>>> > > SelectCEPOperator ( very visible on the UI ) so am sure not enough 
>>> > > elements have been added to the SharedBuffer to create memory stress.
>>> > >
>>> > > The nature of the input stream is that events are pushed out with a 
>>> > > specific timestamp ( it is a time series and the timestamp if the 
>>> > > beginning of the time slot )  as in one will have a bunch of elements 
>>> > > that have a constant timestamp till the next batch appears.
>>> > >
>>> > > A batch though does not have more than the number of keys elements ( 
>>> > > 600 ).
>>> > >
>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi 
>>> > > <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements ( 
>>> > > across 600 keys at the max ) put in
>>> > > and though I have a pretty wide range , as in I am looking at a relaxed 
>>> > > pattern ( like 40 true conditions in 6 hours ),
>>> > > I get this. I have the EventTime turned on.
>>> > >
>>> > >
>>> > > java.lang.OutOfMemoryError: Java heap space
>>> > >       at java.util.Arrays.copyOf(Arrays
>>> > > .java:3332)
>>> > >       at java.lang.AbstractStringBuilde
>>> > > r.ensureCapacityInternal(Abstr
>>> > > actStringBuilder.java:124)
>>> > >       at java.lang.AbstractStringBuilde
>>> > > r.append(AbstractStringBuilder
>>> > > .java:448)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:136)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.Strin
>>> > > gUtils.join(StringUtils.java:4
>>> > > 106)
>>> > >       at org.apache.commons.lang3.Strin
>>> > > gUtils.join(StringUtils.java:4
>>> > > 151)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferEntry.toSt
>>> > > ring(SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(Strin
>>> > > g.java:2994)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>>> > > 64)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>>> > > 35)
>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>> > > FASerializer.serialize(NFA.jav
>>> > > a:888)
>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>> > > FASerializer.serialize(NFA.jav
>>> > > a:820)
>>> > >       at org.apache.flink.contrib.strea
>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>>> > > .
>>> > > .
>>> > > .
>>> > >
>>> > > Any one has seen this issue ?
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> >
>>> >
>>> 
>>> 
>> 
>> 
>> 
> 
> 

Reply via email to