After set this configuration, I have some exceptions : java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406) ... 3 more Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708) ... 13 more
If I run the application in not-HA mode, there is no problem. What can cause this kind of error ? Thanks Thomas ________________________________________De : Thomas Lamirault [thomas.lamira...@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [u...@apache.org]Envoyé : jeudi 18 février 2016 19:19À : user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<thomas.lamira...@ericsson.com> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk