You are right, I won’t be able to reproduce this problem without data. One 
thing I can tell though that I think the problem is indeed with the hashcode. 
Unforunately I don’t know Gson, but one strange thing I noticed is the 
exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), 
[SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which 
seems odd as if your event was empty.

Generally speaking as I understand this Exception is thrown because the 
hashcode of your event changes during serialization, and access to some 
internal temporary cache is broken.

> On 10 Aug 2017, at 14:29, Daiqing Li <lidaiqing1...@gmail.com> wrote:
> 
> Hi,
> 
> Here is the code. But I am not sure if you can reproduce the problem without 
> data source.
> 
> Best,
> Daiqing
> 
> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
> <wysakowicz.da...@gmail.com> wrote:
> As @Kostas asked in your previous thread would be possible for you to share 
> your code for that job or at least a minimal example to reproduce this 
> behaviour. I fear we won’t be able to help you without any further info.
> 
> Regards,
> Dawid
> 
> > On 10 Aug 2017, at 14:10, Daiqing Li <lidaiqing1...@gmail.com> wrote:
> >
> > Hi Flink user,
> >
> > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
> > exception after running for a while. Could anyone give me some help to 
> > debug this? I try parallelism 1, and it has the same problem. I also try 
> > reimplemented hashcode and equals method. I use UUID as hashcode right now.
> > 2017-08-09 18:15:04,572 INFO  
> > org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
> > (d4749a4c3469732a2a5edf40b83f88
> > d4) switched from RUNNING to FAILED.
> > AsynchronousException{java.
> > lang.Exception: Could not materialize checkpoint 946 for operator 
> > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
> >       at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:970)
> >       at java.util.concurrent.
> > Executors$RunnableAdapter.
> > call(Executors.java:511)
> >       at java.util.concurrent.
> > FutureTask.run(FutureTask.
> > java:266)
> >       at java.util.concurrent.
> > ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1149)
> >       at java.util.concurrent.
> > ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.
> > java:748)
> > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
> > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
> >       ... 6 more
> > Caused by: java.util.concurrent.
> > ExecutionException: java.lang.IllegalStateException: Could not find id for 
> > entry: SharedBufferEntry(
> > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
> >       at java.util.concurrent.
> > FutureTask.report(FutureTask.
> > java:122)
> >       at java.util.concurrent.
> > FutureTask.get(FutureTask.
> > java:192)
> >       at org.apache.flink.util.
> > FutureUtil.runIfNotDoneAndGet(
> > FutureUtil.java:43)
> >       at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:897)
> >       ... 5 more
> >       Suppressed: java.lang.Exception: Could not properly cancel managed 
> > keyed state future.
> >               at org.apache.flink.streaming.
> > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> > 90)
> >               at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> > cleanup(StreamTask.java:1023)
> >               at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:961)
> >               ... 5 more
> >
> 
> 
> <MilestoneEvent.java><example.java>

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to