Or this one https://issues.apache.org/jira/browse/FLINK-4815
On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > ping. > > This happened again on production and it seems reasonable to abort > when a checkpoint is not found rather than behave as if it is a brand new > pipeline. > > On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> Folks sorry for being late on this. Can some body with the knowledge of >> this code base create a jira issue for the above ? We have seen this more >> than once on production. >> >> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi Vishal, >>> >>> Some relevant Jira issues for you are: >>> >>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping >>> failed checkpoints >>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback >>> to earlier checkpoint when checkpoint restore fails >>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't always >>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() >>> >>> Best, >>> Aljoscha >>> >>> >>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi Vishal, >>> >>> it would be great if you could create a JIRA ticket with Blocker >>> priority. >>> Please add all relevant information of your detailed analysis, add a >>> link to this email thread (see [1] for the web archive of the mailing >>> list), and post the id of the JIRA issue here. >>> >>> Thanks for looking into this! >>> >>> Best regards, >>> Fabian >>> >>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>> >>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> Thank you for confirming. >>>> >>>> >>>> I think this is a critical bug. In essence any checkpoint store ( >>>> hdfs/S3/File) will loose state if it is unavailable at resume. This >>>> becomes all the more painful with your confirming that "failed >>>> checkpoints killing the job" b'coz essentially it mean that if remote >>>> store in unavailable during checkpoint than you have lost state ( till of >>>> course you have a retry of none or an unbounded retry delay, a delay that >>>> you *hope* the store revives in ) .. Remember the first retry failure >>>> will cause new state according the code as written iff the remote store is >>>> down. We would rather have a configurable property that establishes our >>>> desire to abort something like a "abort_retry_on_chkretrevalfailure" >>>> >>>> >>>> In our case it is very important that we do not undercount a window, >>>> one reason we use flink and it's awesome failure guarantees, as various >>>> alarms sound ( we do anomaly detection on the time series ). >>>> >>>> Please create a jira ticket for us to follow or we could do it. >>>> >>>> >>>> PS Not aborting on checkpointing, till a configurable limit is very >>>> important too. >>>> >>>> >>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi Vishal, >>>>> >>>>> I think you're right! And thanks for looking into this so deeply. >>>>> >>>>> With your last mail your basically saying, that the checkpoint could >>>>> not be restored because your HDFS was temporarily down. If Flink had not >>>>> deleted that checkpoint it might have been possible to restore it at a >>>>> later point, right? >>>>> >>>>> Regarding failed checkpoints killing the job: yes, this is currently >>>>> the expected behaviour but there are plans to change this. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >>>>> wrote: >>>>> >>>>> I think this is the offending piece. There is a catch all Exception, >>>>> which IMHO should understand a recoverable exception from an unrecoverable >>>>> on. >>>>> >>>>> >>>>> try { >>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>> eckpointStateHandle); >>>>> if (completedCheckpoint != null) { >>>>> completedCheckpoints.add(completedCheckpoint); >>>>> } >>>>> } catch (Exception e) { >>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>> completed " + >>>>> "checkpoint store.", e); >>>>> // remove the checkpoint with broken state handle >>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>> checkpointStateHandle.f0); >>>>> } >>>>> >>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> So this is the issue and tell us that it is wrong. ZK had some state >>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact last >>>>>> successful checkpoint that was successful before NN screwed us ). When >>>>>> the >>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve >>>>>> the >>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) >>>>>> removed >>>>>> the CHK from being considered and cleaned the pointer ( though failed as >>>>>> was NN was down and is obvious from the dangling file in recovery ) . The >>>>>> metadata itself was on hdfs and failure in retrieving should have been a >>>>>> stop all, not going to trying doing magic exception rather than starting >>>>>> from a blank state. >>>>>> >>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>>>>> 44286 from state handle under /0000000000000044286. This indicates that >>>>>> the >>>>>> retrieved state handle is broken. Try cleaning the state handle store. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> Also note that the zookeeper recovery did ( sadly on the same hdfs >>>>>>> cluster ) also showed the same behavior. It had the pointers to the chk >>>>>>> point ( I think that is what it does, keeps metadata of where the >>>>>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>>>>> failed state. >>>>>>> >>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>> >>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>> >>>>>>> This is getting a little interesting. What say you :) >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Another thing I noted was this thing >>>>>>>> >>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>>>>> >>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>>>>> >>>>>>>> >>>>>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>>>>> directory with a new one. I see it happening now. Every minute it >>>>>>>> replaces >>>>>>>> the old directory. In this job's case however, it did not delete the >>>>>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>>>>>>> chk-44286 ( I think ) successfully created before NN had issues but >>>>>>>> as >>>>>>>> is usual did not delete this chk-44286. It looks as if it started >>>>>>>> with a >>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>> >>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello Fabian, >>>>>>>>> First of all congratulations on this >>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some >>>>>>>>> natural >>>>>>>>> pluses Flink's state management is far more advanced. With kafka as a >>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is >>>>>>>>> organic >>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with >>>>>>>>> windows on >>>>>>>>> event time etc ) >>>>>>>>> >>>>>>>>> Coming back to this issue. We have that same >>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see any >>>>>>>>> issue there, so so data loss on the source, kafka is not applicable. >>>>>>>>> I am >>>>>>>>> totally certain that the "retention" time was not an issue. It is >>>>>>>>> 4 days of retention and we fixed this issue within 30 minutes. We >>>>>>>>> could >>>>>>>>> replay kafka with a new consumer group.id and that worked fine. >>>>>>>>> >>>>>>>>> >>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>> >>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers >>>>>>>>> is the default true. I bring this up to see whether flink will in any >>>>>>>>> circumstance drive consumption on the kafka perceived offset rather >>>>>>>>> than >>>>>>>>> the one in the checkpoint. >>>>>>>>> >>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The >>>>>>>>> state is big enough though therefore IMHO no way the state is stored >>>>>>>>> along >>>>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is >>>>>>>>> to >>>>>>>>> make sure when you say that the size has to be less than 1024bytes , >>>>>>>>> you >>>>>>>>> are talking about cumulative state of the pipeine. >>>>>>>>> >>>>>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) >>>>>>>>> and certainly understand that they actually are not dissimilar. >>>>>>>>> However in >>>>>>>>> this case there were multiple attempts to restart the pipe before it >>>>>>>>> finally succeeded. >>>>>>>>> >>>>>>>>> * Other hdfs related poperties. >>>>>>>>> >>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>> flink_hdfs_root %> >>>>>>>>> >>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>>>>>>> >>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>> flink_hdfs_root %> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Do these make sense ? Is there anything else I should look at. >>>>>>>>> Please also note that it is the second time this has happened. The >>>>>>>>> first >>>>>>>>> time I was vacationing and was not privy to the state of the flink >>>>>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>>>>> window >>>>>>>>> after an internal restart dropped. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thank you for you patience and regards, >>>>>>>>> >>>>>>>>> Vishal >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Vishal, >>>>>>>>>> >>>>>>>>>> window operators are always stateful because the operator needs >>>>>>>>>> to remember previously received events (WindowFunction) or >>>>>>>>>> intermediate >>>>>>>>>> results (ReduceFunction). >>>>>>>>>> Given the program you described, a checkpoint should include the >>>>>>>>>> Kafka consumer offset and the state of the window operator. If the >>>>>>>>>> program >>>>>>>>>> eventually successfully (i.e., without an error) recovered from the >>>>>>>>>> last >>>>>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would >>>>>>>>>> have been >>>>>>>>>> reset to that point. If the Kafka retention time is less than the >>>>>>>>>> time it >>>>>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate >>>>>>>>>> this >>>>>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>>>>> >>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>> automatic checkpoint recovery. Given that you have a completed >>>>>>>>>> savepoint, >>>>>>>>>> you can restart the job from that point. The main difference is that >>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>> discarded once >>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>> >>>>>>>>>> Regarding your question if a failed checkpoint should cause the >>>>>>>>>> job to fail and recover I'm not sure what the current status is. >>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>> >>>>>>>>>>> keyBy(0) >>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello folks, >>>>>>>>>>>> >>>>>>>>>>>> As far as I know checkpoint failure should be ignored and >>>>>>>>>>>> retried with potentially larger state. I had this situation >>>>>>>>>>>> >>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>> * exception was thrown >>>>>>>>>>>> >>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is not >>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn >>>>>>>>>>>> n-error >>>>>>>>>>>> .................. >>>>>>>>>>>> >>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem. >>>>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>> >>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>>>>> >>>>>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer >>>>>>>>>>>> that kept on >>>>>>>>>>>> advancing it's offset between a start and the next checkpoint >>>>>>>>>>>> failure ( a >>>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates >>>>>>>>>>>> was lost. >>>>>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>>>>> >>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>>>>> >>>>>>>>>>>> The questions thus are >>>>>>>>>>>> >>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >