Thank you for considering this. If I understand you correctly. * CHK pointer on ZK for a CHK state on hdfs was done successfully. * Some issue restarted the pipeline. * The NN was down unfortunately and flink could not retrieve the CHK state from the CHK pointer on ZK.
Before * The CHK pointer was being removed and the job started from a brand new slate. After ( this fix on 1.4 +) * do not delete the CHK pointer ( It has to be subsumed to be deleted ). * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any retry limit ) to restore state * NN comes back * Flink restores state on the next retry. I would hope that is the sequence to follow. Regards. On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Vishal, > > I think you might be right. We fixed the problem that checkpoints where > dropped via https://issues.apache.org/jira/browse/FLINK-7783. However, we > still have the problem that if the DFS is not up at all then it will look > as if the job is starting from scratch. However, the alternative is failing > the job, in which case you will also never be able to restore from a > checkpoint. What do you think? > > Best, > Aljoscha > > > On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote: > > Sorry for the late reply. > > I created FLINK-8487 [1] to track this problem > > @Vishal, can you have a look and check if if forgot some details? I logged > the issue for Flink 1.3.2, is that correct? > Please add more information if you think it is relevant. > > Thanks, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-8487 > > 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> Or this one >> >> https://issues.apache.org/jira/browse/FLINK-4815 >> >> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> ping. >>> >>> This happened again on production and it seems reasonable to abort >>> when a checkpoint is not found rather than behave as if it is a brand new >>> pipeline. >>> >>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Folks sorry for being late on this. Can some body with the knowledge of >>>> this code base create a jira issue for the above ? We have seen this more >>>> than once on production. >>>> >>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi Vishal, >>>>> >>>>> Some relevant Jira issues for you are: >>>>> >>>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>>>> failed checkpoints >>>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>>> fallback to earlier checkpoint when checkpoint restore fails >>>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> >>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> >>>>> Hi Vishal, >>>>> >>>>> it would be great if you could create a JIRA ticket with Blocker >>>>> priority. >>>>> Please add all relevant information of your detailed analysis, add a >>>>> link to this email thread (see [1] for the web archive of the mailing >>>>> list), and post the id of the JIRA issue here. >>>>> >>>>> Thanks for looking into this! >>>>> >>>>> Best regards, >>>>> Fabian >>>>> >>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>>> >>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com> >>>>> : >>>>> >>>>>> Thank you for confirming. >>>>>> >>>>>> >>>>>> I think this is a critical bug. In essence any checkpoint store ( >>>>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>>>> becomes all the more painful with your confirming that "failed >>>>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>>>> store in unavailable during checkpoint than you have lost state ( till >>>>>> of >>>>>> course you have a retry of none or an unbounded retry delay, a delay that >>>>>> you *hope* the store revives in ) .. Remember the first retry >>>>>> failure will cause new state according the code as written iff the >>>>>> remote >>>>>> store is down. We would rather have a configurable property that >>>>>> establishes our desire to abort something like a >>>>>> "abort_retry_on_chkretrevalfailure" >>>>>> >>>>>> >>>>>> In our case it is very important that we do not undercount a window, >>>>>> one reason we use flink and it's awesome failure guarantees, as various >>>>>> alarms sound ( we do anomaly detection on the time series ). >>>>>> >>>>>> Please create a jira ticket for us to follow or we could do it. >>>>>> >>>>>> >>>>>> PS Not aborting on checkpointing, till a configurable limit is very >>>>>> important too. >>>>>> >>>>>> >>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org >>>>>> > wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> I think you're right! And thanks for looking into this so deeply. >>>>>>> >>>>>>> With your last mail your basically saying, that the checkpoint could >>>>>>> not be restored because your HDFS was temporarily down. If Flink had not >>>>>>> deleted that checkpoint it might have been possible to restore it at a >>>>>>> later point, right? >>>>>>> >>>>>>> Regarding failed checkpoints killing the job: yes, this is currently >>>>>>> the expected behaviour but there are plans to change this. >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> I think this is the offending piece. There is a catch all Exception, >>>>>>> which IMHO should understand a recoverable exception from an >>>>>>> unrecoverable >>>>>>> on. >>>>>>> >>>>>>> >>>>>>> try { >>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>>> eckpointStateHandle); >>>>>>> if (completedCheckpoint != null) { >>>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>>> } >>>>>>> } catch (Exception e) { >>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>>> completed " + >>>>>>> "checkpoint store.", e); >>>>>>> // remove the checkpoint with broken state handle >>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>>> checkpointStateHandle.f0); >>>>>>> } >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> So this is the issue and tell us that it is wrong. ZK had some >>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same exact >>>>>>>> last successful checkpoint that was successful before NN screwed us ). >>>>>>>> When >>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to >>>>>>>> retrieve >>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly ) >>>>>>>> removed the CHK from being considered and cleaned the pointer ( though >>>>>>>> failed as was NN was down and is obvious from the dangling file in >>>>>>>> recovery >>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should >>>>>>>> have >>>>>>>> been a stop all, not going to trying doing magic exception rather than >>>>>>>> starting from a blank state. >>>>>>>> >>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>>>>>> 44286 from state handle under /0000000000000044286. This indicates >>>>>>>> that the >>>>>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Also note that the zookeeper recovery did ( sadly on the same >>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to >>>>>>>>> the >>>>>>>>> chk point ( I think that is what it does, keeps metadata of where >>>>>>>>> the >>>>>>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>>>>>> failed state. >>>>>>>>> >>>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>>> >>>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>>> >>>>>>>>> This is getting a little interesting. What say you :) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Another thing I noted was this thing >>>>>>>>>> >>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>> -44286 >>>>>>>>>> >>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk >>>>>>>>>> -45428 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>>>>> directory with a new one. I see it happening now. Every minute it >>>>>>>>>> replaces >>>>>>>>>> the old directory. In this job's case however, it did not delete the >>>>>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the >>>>>>>>>> last >>>>>>>>>> chk-44286 ( I think ) successfully created before NN had issues >>>>>>>>>> but as >>>>>>>>>> is usual did not delete this chk-44286. It looks as if it started >>>>>>>>>> with a >>>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>>> >>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello Fabian, >>>>>>>>>>> First of all congratulations on this >>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some >>>>>>>>>>> natural >>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka as >>>>>>>>>>> a >>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is >>>>>>>>>>> organic >>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with >>>>>>>>>>> windows on >>>>>>>>>>> event time etc ) >>>>>>>>>>> >>>>>>>>>>> Coming back to this issue. We have that same >>>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see >>>>>>>>>>> any >>>>>>>>>>> issue there, so so data loss on the source, kafka is not >>>>>>>>>>> applicable. I am >>>>>>>>>>> totally certain that the "retention" time was not an issue. It >>>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. >>>>>>>>>>> We could >>>>>>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>>> >>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka >>>>>>>>>>> consumers is the default true. I bring this up to see whether flink >>>>>>>>>>> will in >>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset >>>>>>>>>>> rather >>>>>>>>>>> than the one in the checkpoint. >>>>>>>>>>> >>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. >>>>>>>>>>> The state is big enough though therefore IMHO no way the state is >>>>>>>>>>> stored >>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring >>>>>>>>>>> this up is >>>>>>>>>>> to make sure when you say that the size has to be less than >>>>>>>>>>> 1024bytes , you >>>>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>>>> >>>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint >>>>>>>>>>> ) and certainly understand that they actually are not dissimilar. >>>>>>>>>>> However >>>>>>>>>>> in this case there were multiple attempts to restart the pipe >>>>>>>>>>> before it >>>>>>>>>>> finally succeeded. >>>>>>>>>>> >>>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>>> >>>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>> >>>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root >>>>>>>>>>> %> >>>>>>>>>>> >>>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>>>>> Please also note that it is the second time this has happened. The >>>>>>>>>>> first >>>>>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>>>>>>> window >>>>>>>>>>> after an internal restart dropped. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>>> >>>>>>>>>>> Vishal >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>> >>>>>>>>>>>> window operators are always stateful because the operator needs >>>>>>>>>>>> to remember previously received events (WindowFunction) or >>>>>>>>>>>> intermediate >>>>>>>>>>>> results (ReduceFunction). >>>>>>>>>>>> Given the program you described, a checkpoint should include >>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. If >>>>>>>>>>>> the >>>>>>>>>>>> program eventually successfully (i.e., without an error) recovered >>>>>>>>>>>> from the >>>>>>>>>>>> last checkpoint, all its state should have been restored. Since >>>>>>>>>>>> the last >>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would >>>>>>>>>>>> have been >>>>>>>>>>>> reset to that point. If the Kafka retention time is less than the >>>>>>>>>>>> time it >>>>>>>>>>>> took to fix HDFS you would have lost data because it would have >>>>>>>>>>>> been >>>>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate >>>>>>>>>>>> this >>>>>>>>>>>> further because a checkpoint recovery must not result in state >>>>>>>>>>>> loss. >>>>>>>>>>>> >>>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>>>> savepoint, >>>>>>>>>>>> you can restart the job from that point. The main difference is >>>>>>>>>>>> that >>>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>>> discarded once >>>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>>> >>>>>>>>>>>> Regarding your question if a failed checkpoint should cause the >>>>>>>>>>>> job to fail and recover I'm not sure what the current status is. >>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>>>> >>>>>>>>>>>> Best, Fabian >>>>>>>>>>>> >>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>>> >>>>>>>>>>>>> keyBy(0) >>>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>>>> >>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is >>>>>>>>>>>>>> not supported in state standby. Visit >>>>>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>>>>> .................. >>>>>>>>>>>>>> >>>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java >>>>>>>>>>>>>> :111) >>>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>>> >>>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I would not have worried about the restart, but it was >>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka >>>>>>>>>>>>>> consumer that >>>>>>>>>>>>>> kept on advancing it's offset between a start and the next >>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had partial >>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a >>>>>>>>>>>>>> keyed operator >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned >>>>>>>>>>>>>> on. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any >>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as >>>>>>>>>>>>>> expected ? >>>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> > >