Hi Thomas, To avoid having jobs forever restarting, you have to cancel them manually (from the web interface or the /bin/flink client). Also, you can set an appropriate restart strategy (in 1.0-SNAPSHOT), which limits the number of retries. This way the retrying will eventually stop.
On Fri, Feb 19, 2016 at 4:05 PM, Thomas Lamirault < thomas.lamira...@ericsson.com> wrote: > I have resolved my issues. > It seems that Avro had difficulties with my POJO. I change the management > of the null value and it works fine. > > But, there is a way to cancel the old jobGraph who are starving in > restarting status, and to keep the last one to restart ? Other than cancel > JobId manually ? > > Thanks > > Thomas > ________________________________________ > De : Thomas Lamirault [thomas.lamira...@ericsson.com] > Envoyé : vendredi 19 février 2016 10:56 > À : user@flink.apache.org > Objet : RE:Flink HA > > After set this configuration, I have some exceptions : > > java.lang.Exception: Could not restore checkpointed state to operators and > functions > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.InvalidClassException: java.util.HashMap; invalid > descriptor for field > at > java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406) > ... 3 more > Caused by: java.lang.IllegalArgumentException: illegal signature > at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122) > at > java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708) > ... 13 more > > > If I run the application in not-HA mode, there is no problem. > What can cause this kind of error ? > > Thanks > > Thomas > ________________________________________De : Thomas Lamirault [ > thomas.lamira...@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : > user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> > state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but > I will modify the recovery.zookeeper.path.root> This is only relevant if > you are using YARN. From your completeYes, I omit to say we will use > YARN.>Does this help?Yes, a lot > :-)Thomas________________________________________De : Ufuk Celebi [ > u...@apache.org]Envoyé : jeudi 18 février 2016 19:19À : > user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, > Thomas Lamirault<thomas.lamira...@ericsson.com> wrote:> We are trying > flink in HA mode.Great to hear!> We set in the flink yaml :>> > state.backend: filesystem>> recovery.mode: zookeeper> > recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: > <path>>> recovery.zookeeper.storageDir: <storageDir>>> > recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be > state.backend.fs.checkpointdir.Just to check: Both > state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point > to a file system path.> yarn.application-attempts: 100This is only relevant > if you are using YARN. From your complete> We want in case of application > crash, the pending window has to be restore> when the application > restart.>> Pending data are store into the <storageDir>/blob directory ?>> > Also, we try to write a script who restart the application after exceed > the> max attempts, with the last pending window.>> How can I do that ? A > simple restart of the application is enough, or do I> have to "clean" the > recovery.zookeeper.path.root ?Restore happens automatically to the most > recently checkpointed state.Everything under <storageDir> contains the > actual state (includingJARs and JobGraph). ZooKeeper contains pointers to > this state.Therefore, you must not delete the ZooKeeper root path.For the > automatic restart, I would recommend using YARN. If you wantto do it > manually, you need to restart the JobManager/TaskManagerinstances. The > application will be recovered automatically fromZooKeeper/state > backend.Does this help?– Ufuk >