Yes. We have not hit the snag in 1.4.0 ( our current version ). Again though this occurs under sustained down time on hadoop and it has been more stable lately :)
On Wed, Mar 7, 2018 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote: > The assumption in your previous mail is correct. > > Just to double check: > > - The initially affected version you were running was 1.3.2, correct? > > The issue should be fixed in all active branches (1.4, 1.5, 1.6) and > additional in 1.3. > > Currently released versions with this fix: 1.4.0, 1.4.1 > 1.5.0 is in the makings. > > We are looking to create a dedicated 1.3.3 for this fix. > > > On Thu, Jan 25, 2018 at 5:13 PM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> To add to this, we are assuming that the default configuration will fail >> a pipeline if a checkpoint fails and will hit the recover loop only and >> only if the retry limit is not reached >> >> >> >> >> On Thu, Jan 25, 2018 at 7:00 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Sorry. >>> >>> There are 2 scenerios >>> >>> * Idempotent Sinks Use Case where we would want to restore from the >>> latest valid checkpoint. If I understand the code correctly we try to >>> retrieve all completed checkpoints for all handles in ZK and abort ( throw >>> an exception ) if there are handles but no corresponding complete >>> checkpoints in hdfs, else we use the latest valid checkpoint state. On >>> abort a restart and thus restore of the pipe is issued repeating the >>> above execution. If the failure in hdfs was transient a retry will succeed >>> else when the retry limit is reached the pipeline is aborted for good. >>> >>> >>> * Non Idempotent Sinks where we have no retries. We do not want to >>> recover from the last available checkpoint as the above code will do as the >>> more into history we go the more duplicates will be delivered. The only >>> solution is use exactly once semantics of the source and sinks if possible. >>> >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Jan 24, 2018 at 7:20 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Did you see my second mail? >>>> >>>> >>>> On 24. Jan 2018, at 12:50, Vishal Santoshi <vishal.santo...@gmail.com> >>>> wrote: >>>> >>>> As in, if there are chk handles in zk, there should no reason to start >>>> a new job ( bad handle, no hdfs connectivity etc ), >>>> yes that sums it up. >>>> >>>> On Wed, Jan 24, 2018 at 5:35 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Wait a sec, I just checked out the code again and it seems we already >>>>> do that: https://github.com/apache/flink/blob/9071e3befb8c279f7 >>>>> 3c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apac >>>>> he/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStor >>>>> e.java#L210 >>>>> >>>>> If there were some checkpoints but none could be read we fail recovery. >>>>> >>>>> >>>>> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>> That sounds reasonable: We would keep the first fix, i.e. never delete >>>>> checkpoints if they're "corrupt", only when they're subsumed. >>>>> Additionally, >>>>> we fail the job if there are some checkpoints in ZooKeeper but none of >>>>> them >>>>> can be restored to prevent the case where a job starts from scratch even >>>>> though it shouldn't. >>>>> >>>>> Does that sum it up? >>>>> >>>>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com> >>>>> wrote: >>>>> >>>>> If we hit the retry limit, abort the job. In our case we will restart >>>>> from the last SP ( we as any production pile do it is n time s a day ) >>>>> and >>>>> that I would think should be OK for most folks ? >>>>> >>>>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Thank you for considering this. If I understand you correctly. >>>>>> >>>>>> * CHK pointer on ZK for a CHK state on hdfs was done successfully. >>>>>> * Some issue restarted the pipeline. >>>>>> * The NN was down unfortunately and flink could not retrieve the CHK >>>>>> state from the CHK pointer on ZK. >>>>>> >>>>>> Before >>>>>> >>>>>> * The CHK pointer was being removed and the job started from a brand >>>>>> new slate. >>>>>> >>>>>> After ( this fix on 1.4 +) >>>>>> >>>>>> * do not delete the CHK pointer ( It has to be subsumed to be deleted >>>>>> ). >>>>>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we >>>>>> hit any retry limit ) to restore state >>>>>> * NN comes back >>>>>> * Flink restores state on the next retry. >>>>>> >>>>>> I would hope that is the sequence to follow. >>>>>> >>>>>> Regards. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek < >>>>>> aljos...@apache.org> wrote: >>>>>> >>>>>>> Hi Vishal, >>>>>>> >>>>>>> I think you might be right. We fixed the problem that checkpoints >>>>>>> where dropped via https://issues.apache.org/jira/browse/FLINK-7783. >>>>>>> However, we still have the problem that if the DFS is not up at all >>>>>>> then it >>>>>>> will look as if the job is starting from scratch. However, the >>>>>>> alternative >>>>>>> is failing the job, in which case you will also never be able to restore >>>>>>> from a checkpoint. What do you think? >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> >>>>>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>>> >>>>>>> Sorry for the late reply. >>>>>>> >>>>>>> I created FLINK-8487 [1] to track this problem >>>>>>> >>>>>>> @Vishal, can you have a look and check if if forgot some details? I >>>>>>> logged the issue for Flink 1.3.2, is that correct? >>>>>>> Please add more information if you think it is relevant. >>>>>>> >>>>>>> Thanks, >>>>>>> Fabian >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8487 >>>>>>> >>>>>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com>: >>>>>>> >>>>>>>> Or this one >>>>>>>> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-4815 >>>>>>>> >>>>>>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> ping. >>>>>>>>> >>>>>>>>> This happened again on production and it seems reasonable to >>>>>>>>> abort when a checkpoint is not found rather than behave as if it is a >>>>>>>>> brand >>>>>>>>> new pipeline. >>>>>>>>> >>>>>>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Folks sorry for being late on this. Can some body with the >>>>>>>>>> knowledge of this code base create a jira issue for the above ? We >>>>>>>>>> have >>>>>>>>>> seen this more than once on production. >>>>>>>>>> >>>>>>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> Some relevant Jira issues for you are: >>>>>>>>>>> >>>>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-4808: Allow >>>>>>>>>>> skipping failed checkpoints >>>>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-4815: Automatic >>>>>>>>>>> fallback to earlier checkpoint when checkpoint restore fails >>>>>>>>>>> - https://issues.apache.org/jira/browse/FLINK-7783: Don't >>>>>>>>>>> always remove checkpoints in ZooKeeperCompletedCheckpointSt >>>>>>>>>>> ore#recover() >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> it would be great if you could create a JIRA ticket with Blocker >>>>>>>>>>> priority. >>>>>>>>>>> Please add all relevant information of your detailed analysis, >>>>>>>>>>> add a link to this email thread (see [1] for the web archive of the >>>>>>>>>>> mailing >>>>>>>>>>> list), and post the id of the JIRA issue here. >>>>>>>>>>> >>>>>>>>>>> Thanks for looking into this! >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Fabian >>>>>>>>>>> >>>>>>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org >>>>>>>>>>> >>>>>>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Thank you for confirming. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I think this is a critical bug. In essence any checkpoint >>>>>>>>>>>> store ( hdfs/S3/File) will loose state if it is unavailable at >>>>>>>>>>>> resume. >>>>>>>>>>>> This becomes all the more painful with your confirming that >>>>>>>>>>>> "failed >>>>>>>>>>>> checkpoints killing the job" b'coz essentially it mean that if >>>>>>>>>>>> remote >>>>>>>>>>>> store in unavailable during checkpoint than you have lost state ( >>>>>>>>>>>> till of >>>>>>>>>>>> course you have a retry of none or an unbounded retry delay, a >>>>>>>>>>>> delay that >>>>>>>>>>>> you *hope* the store revives in ) .. Remember the first retry >>>>>>>>>>>> failure will cause new state according the code as written iff >>>>>>>>>>>> the remote >>>>>>>>>>>> store is down. We would rather have a configurable property that >>>>>>>>>>>> establishes our desire to abort something like a >>>>>>>>>>>> "abort_retry_on_chkretrevalfailure" >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> In our case it is very important that we do not undercount a >>>>>>>>>>>> window, one reason we use flink and it's awesome failure >>>>>>>>>>>> guarantees, as >>>>>>>>>>>> various alarms sound ( we do anomaly detection on the time series >>>>>>>>>>>> ). >>>>>>>>>>>> >>>>>>>>>>>> Please create a jira ticket for us to follow or we could do it. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> PS Not aborting on checkpointing, till a configurable limit is >>>>>>>>>>>> very important too. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek < >>>>>>>>>>>> aljos...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>> >>>>>>>>>>>>> I think you're right! And thanks for looking into this so >>>>>>>>>>>>> deeply. >>>>>>>>>>>>> >>>>>>>>>>>>> With your last mail your basically saying, that the checkpoint >>>>>>>>>>>>> could not be restored because your HDFS was temporarily down. If >>>>>>>>>>>>> Flink had >>>>>>>>>>>>> not deleted that checkpoint it might have been possible to >>>>>>>>>>>>> restore it at a >>>>>>>>>>>>> later point, right? >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding failed checkpoints killing the job: yes, this is >>>>>>>>>>>>> currently the expected behaviour but there are plans to change >>>>>>>>>>>>> this. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>> >>>>>>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I think this is the offending piece. There is a catch all >>>>>>>>>>>>> Exception, which IMHO should understand a recoverable exception >>>>>>>>>>>>> from an >>>>>>>>>>>>> unrecoverable on. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> try { >>>>>>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch >>>>>>>>>>>>> eckpointStateHandle); >>>>>>>>>>>>> if (completedCheckpoint != null) { >>>>>>>>>>>>> completedCheckpoints.add(completedCheckpoint); >>>>>>>>>>>>> } >>>>>>>>>>>>> } catch (Exception e) { >>>>>>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the >>>>>>>>>>>>> completed " + >>>>>>>>>>>>> "checkpoint store.", e); >>>>>>>>>>>>> // remove the checkpoint with broken state handle >>>>>>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, >>>>>>>>>>>>> checkpointStateHandle.f0); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had >>>>>>>>>>>>>> some state ( backed by hdfs ) that referred to a checkpoint ( >>>>>>>>>>>>>> the same >>>>>>>>>>>>>> exact last successful checkpoint that was successful before NN >>>>>>>>>>>>>> screwed us >>>>>>>>>>>>>> ). When the JM tried to recreate the state and b'coz NN was down >>>>>>>>>>>>>> failed to >>>>>>>>>>>>>> retrieve the CHK handle from hdfs and conveniently ( and I think >>>>>>>>>>>>>> very >>>>>>>>>>>>>> wrongly ) removed the CHK from being considered and cleaned the >>>>>>>>>>>>>> pointer ( >>>>>>>>>>>>>> though failed as was NN was down and is obvious from the >>>>>>>>>>>>>> dangling file in >>>>>>>>>>>>>> recovery ) . The metadata itself was on hdfs and failure in >>>>>>>>>>>>>> retrieving >>>>>>>>>>>>>> should have been a stop all, not going to trying doing magic >>>>>>>>>>>>>> exception >>>>>>>>>>>>>> rather than starting from a blank state. >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve >>>>>>>>>>>>>> checkpoint 44286 from state handle under >>>>>>>>>>>>>> /0000000000000044286. This indicates that the retrieved state >>>>>>>>>>>>>> handle is >>>>>>>>>>>>>> broken. Try cleaning the state handle store. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also note that the zookeeper recovery did ( sadly on the >>>>>>>>>>>>>>> same hdfs cluster ) also showed the same behavior. It had the >>>>>>>>>>>>>>> pointers to >>>>>>>>>>>>>>> the chk point ( I think that is what it does, keeps metadata >>>>>>>>>>>>>>> of where the >>>>>>>>>>>>>>> checkpoint etc ) . It too decided to keep the recovery file >>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>> failed state. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -rw-r--r-- 3 root hadoop 7041 2017-10-04 13:55 >>>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -rw-r--r-- 3 root hadoop 7044 2017-10-05 10:07 >>>>>>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is getting a little interesting. What say you :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi < >>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Another thing I noted was this thing >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-04 13:54 >>>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8d >>>>>>>>>>>>>>>> fa864e2f9a51764de9f0725b39/chk-44286 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> drwxr-xr-x - root hadoop 0 2017-10-05 09:15 >>>>>>>>>>>>>>>> /flink-checkpoints/prod/c4af8d >>>>>>>>>>>>>>>> fa864e2f9a51764de9f0725b39/chk-45428 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk >>>>>>>>>>>>>>>> point directory with a new one. I see it happening now. Every >>>>>>>>>>>>>>>> minute it >>>>>>>>>>>>>>>> replaces the old directory. In this job's case however, it >>>>>>>>>>>>>>>> did not delete >>>>>>>>>>>>>>>> the 2017-10-04 13:54 and hence the chk-44286 directory. This >>>>>>>>>>>>>>>> was the last >>>>>>>>>>>>>>>> chk-44286 ( I think ) successfully created before NN had >>>>>>>>>>>>>>>> issues but as >>>>>>>>>>>>>>>> is usual did not delete this chk-44286. It looks as if it >>>>>>>>>>>>>>>> started with a >>>>>>>>>>>>>>>> blank slate ???????? Does this strike a chord ????? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi < >>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello Fabian, >>>>>>>>>>>>>>>>> First of all congratulations on this >>>>>>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has >>>>>>>>>>>>>>>>> some natural >>>>>>>>>>>>>>>>> pluses Flink's state management is far more advanced. With >>>>>>>>>>>>>>>>> kafka as a >>>>>>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with >>>>>>>>>>>>>>>>> pub/sub is organic >>>>>>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue >>>>>>>>>>>>>>>>> with windows on >>>>>>>>>>>>>>>>> event time etc ) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Coming back to this issue. We have that >>>>>>>>>>>>>>>>> same kafka topic feeding a streaming druid datasource and we >>>>>>>>>>>>>>>>> do not see any >>>>>>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not >>>>>>>>>>>>>>>>> applicable. I am >>>>>>>>>>>>>>>>> totally certain that the "retention" time was not an >>>>>>>>>>>>>>>>> issue. It is 4 days of retention and we fixed this issue >>>>>>>>>>>>>>>>> within 30 minutes. >>>>>>>>>>>>>>>>> We could replay kafka with a new consumer group.id and >>>>>>>>>>>>>>>>> that worked fine. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Note these properties and see if they strike a chord. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka >>>>>>>>>>>>>>>>> consumers is the default true. I bring this up to see whether >>>>>>>>>>>>>>>>> flink will in >>>>>>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived >>>>>>>>>>>>>>>>> offset rather >>>>>>>>>>>>>>>>> than the one in the checkpoint. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been >>>>>>>>>>>>>>>>> set. The state is big enough though therefore IMHO no way >>>>>>>>>>>>>>>>> the state is >>>>>>>>>>>>>>>>> stored along with the meta data in JM ( or ZK ? ) . The >>>>>>>>>>>>>>>>> reason I bring this >>>>>>>>>>>>>>>>> up is to make sure when you say that the size has to be less >>>>>>>>>>>>>>>>> than 1024bytes >>>>>>>>>>>>>>>>> , you are talking about cumulative state of the pipeine. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> * We have a good sense of SP ( save point ) and CP ( >>>>>>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts >>>>>>>>>>>>>>>>> to restart >>>>>>>>>>>>>>>>> the pipe before it finally succeeded. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> * Other hdfs related poperties. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%= >>>>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> state.savepoints.dir: hdfs:///flink-savepoints/<%= >>>>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= >>>>>>>>>>>>>>>>> flink_hdfs_root %> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Do these make sense ? Is there anything else I should look >>>>>>>>>>>>>>>>> at. Please also note that it is the second time this has >>>>>>>>>>>>>>>>> happened. The >>>>>>>>>>>>>>>>> first time I was vacationing and was not privy to the state >>>>>>>>>>>>>>>>> of the flink >>>>>>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the >>>>>>>>>>>>>>>>> first window >>>>>>>>>>>>>>>>> after an internal restart dropped. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thank you for you patience and regards, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Vishal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske < >>>>>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> window operators are always stateful because the operator >>>>>>>>>>>>>>>>>> needs to remember previously received events >>>>>>>>>>>>>>>>>> (WindowFunction) or >>>>>>>>>>>>>>>>>> intermediate results (ReduceFunction). >>>>>>>>>>>>>>>>>> Given the program you described, a checkpoint should >>>>>>>>>>>>>>>>>> include the Kafka consumer offset and the state of the >>>>>>>>>>>>>>>>>> window operator. If >>>>>>>>>>>>>>>>>> the program eventually successfully (i.e., without an error) >>>>>>>>>>>>>>>>>> recovered from >>>>>>>>>>>>>>>>>> the last checkpoint, all its state should have been >>>>>>>>>>>>>>>>>> restored. Since the >>>>>>>>>>>>>>>>>> last checkpoint was before HDFS went into safe mode, the >>>>>>>>>>>>>>>>>> program would have >>>>>>>>>>>>>>>>>> been reset to that point. If the Kafka retention time is >>>>>>>>>>>>>>>>>> less than the time >>>>>>>>>>>>>>>>>> it took to fix HDFS you would have lost data because it >>>>>>>>>>>>>>>>>> would have been >>>>>>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to >>>>>>>>>>>>>>>>>> investigate this >>>>>>>>>>>>>>>>>> further because a checkpoint recovery must not result in >>>>>>>>>>>>>>>>>> state loss. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Restoring from a savepoint is not so much different from >>>>>>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a >>>>>>>>>>>>>>>>>> completed savepoint, >>>>>>>>>>>>>>>>>> you can restart the job from that point. The main difference >>>>>>>>>>>>>>>>>> is that >>>>>>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually >>>>>>>>>>>>>>>>>> discarded once >>>>>>>>>>>>>>>>>> the job is terminated while savepoints are retained. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regarding your question if a failed checkpoint should >>>>>>>>>>>>>>>>>> cause the job to fail and recover I'm not sure what the >>>>>>>>>>>>>>>>>> current status is. >>>>>>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint >>>>>>>>>>>>>>>>>> fails. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi < >>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> To add to it, my pipeline is a simple >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> keyBy(0) >>>>>>>>>>>>>>>>>>> .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) >>>>>>>>>>>>>>>>>>> .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) >>>>>>>>>>>>>>>>>>> .reduce(new ReduceFunction(), new WindowFunction()) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi < >>>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hello folks, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored >>>>>>>>>>>>>>>>>>>> and retried with potentially larger state. I had this >>>>>>>>>>>>>>>>>>>> situation >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues >>>>>>>>>>>>>>>>>>>> * exception was thrown >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache. >>>>>>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE >>>>>>>>>>>>>>>>>>>> is not supported in state standby. Visit >>>>>>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error >>>>>>>>>>>>>>>>>>>> .................. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.fs.hd >>>>>>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) >>>>>>>>>>>>>>>>>>>> at org.apache.flink.core.fs.Safet >>>>>>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(S >>>>>>>>>>>>>>>>>>>> afetyNetWrapperFileSystem.java:111) >>>>>>>>>>>>>>>>>>>> at org.apache.flink.runtime.state.filesystem. >>>>>>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck >>>>>>>>>>>>>>>>>>>> pointStreamFactory.java:132) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and >>>>>>>>>>>>>>>>>>>> checkpoint failures, after the hdfs issues were resolved. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would not have worried about the restart, but it was >>>>>>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my >>>>>>>>>>>>>>>>>>>> kafka consumer that >>>>>>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next >>>>>>>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had >>>>>>>>>>>>>>>>>>>> partial >>>>>>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts >>>>>>>>>>>>>>>>>>>> on a keyed operator >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing >>>>>>>>>>>>>>>>>>>> turned on. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The questions thus are >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ? >>>>>>>>>>>>>>>>>>>> * Why on restart did the operator state did not >>>>>>>>>>>>>>>>>>>> recreate ? >>>>>>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with >>>>>>>>>>>>>>>>>>>> any of this b'coz suspend and resume from a save point >>>>>>>>>>>>>>>>>>>> work as expected ? >>>>>>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the >>>>>>>>>>>>>>>>>>>> Window operator stateful by drfault and thus if I have >>>>>>>>>>>>>>>>>>>> timeWindow(Time.of(window_size, >>>>>>>>>>>>>>>>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new >>>>>>>>>>>>>>>>>>>> WindowFunction()), the >>>>>>>>>>>>>>>>>>>> state is managed by flink ? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >> >