Hi Vishal, it would be great if you could create a JIRA ticket with Blocker priority. Please add all relevant information of your detailed analysis, add a link to this email thread (see [1] for the web archive of the mailing list), and post the id of the JIRA issue here.
Thanks for looking into this! Best regards, Fabian [1] https://lists.apache.org/list.html?user@flink.apache.org 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: > Thank you for confirming. > > > I think this is a critical bug. In essence any checkpoint store ( > hdfs/S3/File) will loose state if it is unavailable at resume. This > becomes all the more painful with your confirming that "failed > checkpoints killing the job" b'coz essentially it mean that if remote > store in unavailable during checkpoint than you have lost state ( till of > course you have a retry of none or an unbounded retry delay, a delay that > you *hope* the store revives in ) .. Remember the first retry failure > will cause new state according the code as written iff the remote store is > down. We would rather have a configurable property that establishes our > desire to abort something like a "abort_retry_on_chkretrevalfailure" > > > In our case it is very important that we do not undercount a window, one > reason we use flink and it's awesome failure guarantees, as various alarms > sound ( we do anomaly detection on the time series ). > > Please create a jira ticket for us to follow or we could do it. > > > PS Not aborting on checkpointing, till a configurable limit is very > important too. > > > On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Vishal, >> >> I think you're right! And thanks for looking into this so deeply. >> >> With your last mail your basically saying, that the checkpoint could not >> be restored because your HDFS was temporarily down. If Flink had not >> deleted that checkpoint it might have been possible to restore it at a >> later point, right? >> >> Regarding failed checkpoints killing the job: yes, this is currently the >> expected behaviour but there are plans to change this. >> >> Best, >> Aljoscha >> >> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com> >> wrote: >> >> I think this is the offending piece. There is a catch all Exception, >> which IMHO should understand a recoverable exception from an unrecoverable >> on. >> >> >> try { >> completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); >> if (completedCheckpoint != null) { >> completedCheckpoints.add(completedCheckpoint); >> } >> } catch (Exception e) { >> LOG.warn("Could not retrieve checkpoint. Removing it from the completed " >> + >> "checkpoint store.", e); >> // remove the checkpoint with broken state handle >> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle. >> f0); >> } >> >> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> So this is the issue and tell us that it is wrong. ZK had some state ( >>> backed by hdfs ) that referred to a checkpoint ( the same exact last >>> successful checkpoint that was successful before NN screwed us ). When the >>> JM tried to recreate the state and b'coz NN was down failed to retrieve the >>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed >>> the CHK from being considered and cleaned the pointer ( though failed as >>> was NN was down and is obvious from the dangling file in recovery ) . The >>> metadata itself was on hdfs and failure in retrieving should have been a >>> stop all, not going to trying doing magic exception rather than starting >>> from a blank state. >>> >>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint >>> 44286 from state handle under /0000000000000044286. This indicates that the >>> retrieved state handle is broken. Try cleaning the state handle store. >>> >>> >>> >>> >>> >>> >>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> Also note that the zookeeper recovery did ( sadly on the same hdfs >>>> cluster ) also showed the same behavior. It had the pointers to the chk >>>> point ( I think that is what it does, keeps metadata of where the >>>> checkpoint etc ) . It too decided to keep the recovery file from the >>>> failed state. >>>> >>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>> >>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>> >>>> This is getting a little interesting. What say you :) >>>> >>>> >>>> >>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Another thing I noted was this thing >>>>> >>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286 >>>>> >>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428 >>>>> >>>>> >>>>> Generally what Flink does IMHO is that it replaces the chk point >>>>> directory with a new one. I see it happening now. Every minute it replaces >>>>> the old directory. In this job's case however, it did not delete the >>>>> 2017-10-04 13:54 and hence the chk-44286 directory. This was the last >>>>> chk-44286 ( I think ) successfully created before NN had issues but as >>>>> is usual did not delete this chk-44286. It looks as if it started with a >>>>> blank slate ???????? Does this strike a chord ????? >>>>> >>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Hello Fabian, >>>>>> First of all congratulations on this fabulous >>>>>> framework. I have worked with GDF and though GDF has some natural pluses >>>>>> Flink's state management is far more advanced. With kafka as a source it >>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and that >>>>>> is to be expected but non FIFO pub/sub is an issue with windows on event >>>>>> time etc ) >>>>>> >>>>>> Coming back to this issue. We have that same kafka >>>>>> topic feeding a streaming druid datasource and we do not see any issue >>>>>> there, so so data loss on the source, kafka is not applicable. I am >>>>>> totally >>>>>> certain that the "retention" time was not an issue. It is 4 days of >>>>>> retention and we fixed this issue within 30 minutes. We could replay >>>>>> kafka >>>>>> with a new consumer group.id and that worked fine. >>>>>> >>>>>> >>>>>> Note these properties and see if they strike a chord. >>>>>> >>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is >>>>>> the default true. I bring this up to see whether flink will in any >>>>>> circumstance drive consumption on the kafka perceived offset rather than >>>>>> the one in the checkpoint. >>>>>> >>>>>> * The state.backend.fs.memory-threshold: 0 has not been set. The >>>>>> state is big enough though therefore IMHO no way the state is stored >>>>>> along >>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to >>>>>> make sure when you say that the size has to be less than 1024bytes , you >>>>>> are talking about cumulative state of the pipeine. >>>>>> >>>>>> * We have a good sense of SP ( save point ) and CP ( checkpoint ) >>>>>> and certainly understand that they actually are not dissimilar. However >>>>>> in >>>>>> this case there were multiple attempts to restart the pipe before it >>>>>> finally succeeded. >>>>>> >>>>>> * Other hdfs related poperties. >>>>>> >>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>> flink_hdfs_root %> >>>>>> >>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %> >>>>>> >>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>> flink_hdfs_root %> >>>>>> >>>>>> >>>>>> >>>>>> Do these make sense ? Is there anything else I should look at. >>>>>> Please also note that it is the second time this has happened. The first >>>>>> time I was vacationing and was not privy to the state of the flink >>>>>> pipeline, but the net effect were similar. The counts for the first >>>>>> window >>>>>> after an internal restart dropped. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Thank you for you patience and regards, >>>>>> >>>>>> Vishal >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> window operators are always stateful because the operator needs to >>>>>>> remember previously received events (WindowFunction) or intermediate >>>>>>> results (ReduceFunction). >>>>>>> Given the program you described, a checkpoint should include the >>>>>>> Kafka consumer offset and the state of the window operator. If the >>>>>>> program >>>>>>> eventually successfully (i.e., without an error) recovered from the last >>>>>>> checkpoint, all its state should have been restored. Since the last >>>>>>> checkpoint was before HDFS went into safe mode, the program would have >>>>>>> been >>>>>>> reset to that point. If the Kafka retention time is less than the time >>>>>>> it >>>>>>> took to fix HDFS you would have lost data because it would have been >>>>>>> removed from Kafka. If that's not the case, we need to investigate this >>>>>>> further because a checkpoint recovery must not result in state loss. >>>>>>> >>>>>>> Restoring from a savepoint is not so much different from automatic >>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can >>>>>>> restart the job from that point. The main difference is that checkpoints >>>>>>> are only used for internal recovery and usually discarded once the job >>>>>>> is >>>>>>> terminated while savepoints are retained. >>>>>>> >>>>>>> Regarding your question if a failed checkpoint should cause the job >>>>>>> to fail and recover I'm not sure what the current status is. >>>>>>> Stefan (in CC) should know what happens if a checkpoint fails. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>>> >: >>>>>>> >>>>>>>> To add to it, my pipeline is a simple >>>>>>>> >>>>>>>> keyBy(0) >>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello folks, >>>>>>>>> >>>>>>>>> As far as I know checkpoint failure should be ignored and retried >>>>>>>>> with potentially larger state. I had this situation >>>>>>>>> >>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>> * exception was thrown >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>>>>>> Operation category WRITE is not supported in state standby. Visit >>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>> .................. >>>>>>>>> >>>>>>>>> at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs( >>>>>>>>> HadoopFileSystem.java:453) >>>>>>>>> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem. >>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111) >>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>> pointStreamFactory.java:132) >>>>>>>>> >>>>>>>>> * The pipeline came back after a few restarts and checkpoint >>>>>>>>> failures, after the hdfs issues were resolved. >>>>>>>>> >>>>>>>>> I would not have worried about the restart, but it was evident >>>>>>>>> that I lost my operator state. Either it was my kafka consumer that >>>>>>>>> kept on >>>>>>>>> advancing it's offset between a start and the next checkpoint failure >>>>>>>>> ( a >>>>>>>>> minute's worth ) or the the operator that had partial aggregates was >>>>>>>>> lost. >>>>>>>>> I have a 15 minute window of counts on a keyed operator >>>>>>>>> >>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on. >>>>>>>>> >>>>>>>>> The questions thus are >>>>>>>>> >>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>> * Why on restart did the operator state did not recreate ? >>>>>>>>> * Is the nature of the exception thrown have to do with any of >>>>>>>>> this b'coz suspend and resume from a save point work as expected ? >>>>>>>>> * And though I am pretty sure, are operators like the Window >>>>>>>>> operator stateful by drfault and thus if I have >>>>>>>>> timeWindow(Time.of(window_ >>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>> WindowFunction()), the state is managed by flink ? >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >