We could not recreate in a controlled setup, but here are a few notes that
we have gathered on a simple  "times(n),within(..)"

In case where the Event does not create a Final or Stop state

* As an NFA processes an Event, NFA mutates if there is a true Event. Each
computation is a counter that keeps track of partial matches with each true
Event already existent partial match for that computation unit. Essentially
for n Events and if each Event is a true there will be roughly n-1
computations, each with representing an Event from 1 to n-1 ( so 1 or first
will have n-1 events in the partial match, 2 has n-1 events and so on and
n-1 has the last event as a partial match ).

* If the WM progresses  beyond the ts of the 1st computation, that partial
match is pruned.

* It makes sure that a SharedBufferEntry is pruned only if the count of
Edges originating from it reduces to 0 ( the internalRemove() which uses a
Stack) , which should happen as WM keeps progressing to the nth element for
unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a
WM progression


In case there is a FinalState  ( and we skipToFirstAfterLast )

* The NFA by will prune ( release )  all partial matches and prune the
shared buffer and emit the current match. The computations now should be
empty.

There is a lot to it, but is that roughly what is done in that code ?



Few questions.

* What we have seen is that the call to toString method of SharedBuffer is
where OOM occurs. Now in the code there is no call to a Log so we are not
sure why the method or who calls that method. Surely that is not part of
the Seriazation/DeSer routine or is it ( very surprising if it is )
* There is no out of the box implementation of "m out of n"  pattern match.
We have to resort to n in range ( m * time series slot ) which we do. This
is fine but what it does not allow is an optimization where if n false
conditions are seen, one can prune.  Simply speaking if n-m  false have
been seen there is no way  that out of n there will be ever m trues and
thus SharedBuffer can be pruned to the last true seen ( very akin to
skipToFirstAfterLast
).

We will keep instrumenting the code ( which apart from the null message is
easily understandable ) but would love to hear your feedback.




























On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Thanks a lot Vishal!
>
> We are looking forward to a test case that reproduces the failure.
>
> Kostas
>
>
> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> This is the pattern. Will create a test case.
>
> /**
>  *
>  * @param condition a single condition is applied as a  acceptance criteria
>  * @param params defining the bounds of the pattern.
>  * @param <U> the element in the stream
>  * @return compiled pattern alonf with the params.
>  */
> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
> of(SimpleCondition<U> condition,
>                                                                           
> RelaxedContiguityWithinTime params,
>                                                                           
> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>                                                                           
> String patternId) {
>     assert (params.seriesLength >= params.elementCount && params.elementCount 
> > 0);
>     Pattern<U, ?> pattern = Pattern.
>             <U>begin(START).
>             where(condition);
>     if (params.elementCount > 1) pattern = pattern.
>             followedBy(REST).
>             where(condition).
>             times(params.elementCount - 1);
>
>
>     return new RelaxedContiguousPattern<U>(
>             pattern.within(Time.minutes(params.seriesLength * 
> params.period.duration))
>             ,params,
>             params.elementCount > 1,
>             params.period.duration,
>             mapFunc,
>             patternId
>     );
> }
>
>
>
>
> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Could you provide some example to reproduce the case? Or the Pattern that
>> you are using? It would help track down the issue.
>>
>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>> >
>> > I have pulled in the flink master cep library and the runtime ( the
>> cluster ) is configured to work against the latest and greatest. This does
>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is
>> always an issue when it is a larger range ( 20 out of 25 with range of 8
>> hours ) . Does that makes sense?
>> >
>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <
>> wysakowicz.da...@gmail.com> wrote:
>> > This problem sounds very similar to this one that was fixed for 1.4.1
>> and 1.5.0:
>> > https://issues.apache.org/jira/browse/FLINK-8226
>> >
>> > Could you check if that helps with your problem too?
>> >
>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>> > >
>> > > I have flink master CEP library code imported to  a 1.4 build.
>> > >
>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> > > A new one
>> > >
>> > > java.lang.OutOfMemoryError: Java heap space
>> > >       at java.util.Arrays.copyOf(
>> > > Arrays.java:3332)
>> > >       at java.lang.
>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB
>> uilder.java:
>> > > 124)
>> > >       at java.lang.
>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>> > > 448)
>> > >       at java.lang.StringBuilder.
>> > > append(StringBuilder.java:136)
>> > >       at java.lang.StringBuilder.
>> > > append(StringBuilder.java:131)
>> > >       at org.apache.commons.lang3.
>> > > StringUtils.join(StringUtils.
>> > > java:4106)
>> > >       at org.apache.commons.lang3.
>> > > StringUtils.join(StringUtils.
>> > > java:4151)
>> > >       at org.apache.flink.cep.nfa.
>> > > SharedBuffer$SharedBufferEntry.toString(
>> > > SharedBuffer.java:624)
>> > >       at java.lang.String.valueOf(
>> > > String.java:2994)
>> > >       at java.lang.StringBuilder.
>> > > append(StringBuilder.java:131)
>> > >       at org.apache.flink.cep.nfa.
>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>> > > 673)
>> > >       at java.lang.String.valueOf(
>> > > String.java:2994)
>> > >       at java.lang.StringBuilder.
>> > > append(StringBuilder.java:131)
>> > >       at org.apache.commons.lang3.
>> > > StringUtils.join(StringUtils.
>> > > java:4097)
>> > >       at org.apache.commons.lang3.
>> > > StringUtils.join(StringUtils.
>> > > java:4151)
>> > >       at org.apache.flink.cep.nfa.
>> > > SharedBuffer$SharedBufferEntry.toString(
>> > > SharedBuffer.java:624)
>> > >       at java.lang.String.valueOf(
>> > > String.java:2994)
>> > >       at java.lang.StringBuilder.
>> > > append(StringBuilder.java:131)
>> > >       at org.apache.flink.cep.nfa.
>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>> > > .
>> > > .
>> > > .
>> > > It is the toString() on
>> > > SharedBuffer
>> > > no doubt. Some recursive loop ?
>> > >
>> > >
>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> > > It happens when it looks to throw an exception and calls
>> shardBuffer.toString. b'coz of the check....
>> > >
>> > >
>> > > int id = sharedBuffer.entryId;
>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " +
>> sharedBuffer);
>> > >
>> > >
>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> > > The watermark has not moved for this pattern to succeed ( or other
>> wise ), the issue though is that it is pretty early in the pipe ( like
>> within a minute ).  I am replaying from a kafka topic but the keyed
>> operator has emitted no more than 1500 plus elements to SelectCEPOperator (
>> very visible on the UI ) so am sure not enough elements have been added to
>> the SharedBuffer to create memory stress.
>> > >
>> > > The nature of the input stream is that events are pushed out with a
>> specific timestamp ( it is a time series and the timestamp if the beginning
>> of the time slot )  as in one will have a bunch of elements that have a
>> constant timestamp till the next batch appears.
>> > >
>> > > A batch though does not have more than the number of keys elements (
>> 600 ).
>> > >
>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> > > This is a pretty simple pattern, as in I hardly have 1500 elements (
>> across 600 keys at the max ) put in
>> > > and though I have a pretty wide range , as in I am looking at a
>> relaxed pattern ( like 40 true conditions in 6 hours ),
>> > > I get this. I have the EventTime turned on.
>> > >
>> > >
>> > > java.lang.OutOfMemoryError: Java heap space
>> > >       at java.util.Arrays.copyOf(Arrays
>> > > .java:3332)
>> > >       at java.lang.AbstractStringBuilde
>> > > r.ensureCapacityInternal(Abstr
>> > > actStringBuilder.java:124)
>> > >       at java.lang.AbstractStringBuilde
>> > > r.append(AbstractStringBuilder
>> > > .java:448)
>> > >       at java.lang.StringBuilder.append
>> > > (StringBuilder.java:136)
>> > >       at java.lang.StringBuilder.append
>> > > (StringBuilder.java:131)
>> > >       at org.apache.commons.lang3.Strin
>> > > gUtils.join(StringUtils.java:4
>> > > 106)
>> > >       at org.apache.commons.lang3.Strin
>> > > gUtils.join(StringUtils.java:4
>> > > 151)
>> > >       at org.apache.flink.cep.nfa.Share
>> > > dBuffer$SharedBufferEntry.toSt
>> > > ring(SharedBuffer.java:624)
>> > >       at java.lang.String.valueOf(Strin
>> > > g.java:2994)
>> > >       at java.lang.StringBuilder.append
>> > > (StringBuilder.java:131)
>> > >       at org.apache.flink.cep.nfa.Share
>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>> > > 64)
>> > >       at org.apache.flink.cep.nfa.Share
>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>> > > 35)
>> > >       at org.apache.flink.cep.nfa.NFA$N
>> > > FASerializer.serialize(NFA.jav
>> > > a:888)
>> > >       at org.apache.flink.cep.nfa.NFA$N
>> > > FASerializer.serialize(NFA.jav
>> > > a:820)
>> > >       at org.apache.flink.contrib.strea
>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>> > > .
>> > > .
>> > > .
>> > >
>> > > Any one has seen this issue ?
>> > >
>> > >
>> > >
>> > >
>> > >
>> >
>> >
>>
>>
>
>

Reply via email to