Wait a sec, I just checked out the code again and it seems we already do that: 
https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210
 
<https://github.com/apache/flink/blob/9071e3befb8c279f73c3094c9f6bddc0e7cce9e5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L210>

If there were some checkpoints but none could be read we fail recovery.

> On 24. Jan 2018, at 11:32, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> That sounds reasonable: We would keep the first fix, i.e. never delete 
> checkpoints if they're "corrupt", only when they're subsumed. Additionally, 
> we fail the job if there are some checkpoints in ZooKeeper but none of them 
> can be restored to prevent the case where a job starts from scratch even 
> though it shouldn't.
> 
> Does that sum it up?
> 
>> On 24. Jan 2018, at 01:19, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> 
>> If we hit the retry limit, abort the job. In our case we will restart from 
>> the last SP ( we as any production pile do it is n time s a day )  and that 
>> I would think should be OK for most folks ?
>> 
>> On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>> <mailto:vishal.santo...@gmail.com>> wrote:
>> Thank you for considering this. If I understand you correctly.
>> 
>> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
>> * Some issue restarted the pipeline.
>> * The NN was down unfortunately and flink could not retrieve the  CHK state 
>> from the CHK pointer on ZK.
>> 
>> Before 
>> 
>> * The CHK pointer was being removed and the job started from a brand new 
>> slate.
>> 
>> After ( this fix on 1.4 +) 
>> 
>> * do not delete the CHK pointer ( It has to be subsumed to be deleted ). 
>> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any 
>> retry limit ) to restore state
>> * NN comes back 
>> * Flink restores state on the next retry.
>> 
>> I would hope that is the sequence to follow.
>> 
>> Regards.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi Vishal,
>> 
>> I think you might be right. We fixed the problem that checkpoints where 
>> dropped via https://issues.apache.org/jira/browse/FLINK-7783 
>> <https://issues.apache.org/jira/browse/FLINK-7783>. However, we still have 
>> the problem that if the DFS is not up at all then it will look as if the job 
>> is starting from scratch. However, the alternative is failing the job, in 
>> which case you will also never be able to restore from a checkpoint. What do 
>> you think?
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> wrote:
>>> 
>>> Sorry for the late reply.
>>> 
>>> I created FLINK-8487 [1] to track this problem
>>> 
>>> @Vishal, can you have a look and check if if forgot some details? I logged 
>>> the issue for Flink 1.3.2, is that correct?
>>> Please add more information if you think it is relevant.
>>> 
>>> Thanks,
>>> Fabian
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-8487 
>>> <https://issues.apache.org/jira/browse/FLINK-8487>
>>> 
>>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>>:
>>> Or this one 
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-4815 
>>> <https://issues.apache.org/jira/browse/FLINK-4815>
>>> 
>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> ping. 
>>> 
>>>     This happened again on production and it seems reasonable to abort when 
>>> a checkpoint is not found rather than behave as if it is a brand new 
>>> pipeline.  
>>> 
>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <vishal.santo...@gmail.com 
>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>> Folks sorry for being late on this. Can some body with the knowledge of 
>>> this code base create a jira issue for the above ? We have seen this more 
>>> than once on production.
>>> 
>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> Hi Vishal,
>>> 
>>> Some relevant Jira issues for you are:
>>> 
>>>  - https://issues.apache.org/jira/browse/FLINK-4808: 
>>> <https://issues.apache.org/jira/browse/FLINK-4808:> Allow skipping failed 
>>> checkpoints
>>>  - https://issues.apache.org/jira/browse/FLINK-4815: 
>>> <https://issues.apache.org/jira/browse/FLINK-4815:> Automatic fallback to 
>>> earlier checkpoint when checkpoint restore fails
>>>  - https://issues.apache.org/jira/browse/FLINK-7783: 
>>> <https://issues.apache.org/jira/browse/FLINK-7783:> Don't always remove 
>>> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com 
>>>> <mailto:fhue...@gmail.com>> wrote:
>>>> 
>>>> Hi Vishal,
>>>> 
>>>> it would be great if you could create a JIRA ticket with Blocker priority.
>>>> Please add all relevant information of your detailed analysis, add a link 
>>>> to this email thread (see [1] for the web archive of the mailing list), 
>>>> and post the id of the JIRA issue here.
>>>> 
>>>> Thanks for looking into this!
>>>> 
>>>> Best regards,
>>>> Fabian
>>>> 
>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org 
>>>> <https://lists.apache.org/list.html?user@flink.apache.org>
>>>> 
>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>>> <mailto:vishal.santo...@gmail.com>>:
>>>> Thank you for confirming. 
>>>>        
>>>> 
>>>>  I think this is a critical bug. In essence any checkpoint store ( 
>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This 
>>>> becomes all the more painful with your confirming that  "failed 
>>>> checkpoints killing the job"  b'coz essentially it mean that if remote 
>>>> store in unavailable  during checkpoint than you have lost state ( till of 
>>>> course you have a retry of none or an unbounded retry delay, a delay that 
>>>> you hope the store revives in ) .. Remember  the first retry failure  will 
>>>> cause new state according the code as written iff the remote store is 
>>>> down. We would rather have a configurable property that establishes  our 
>>>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>>> 
>>>> 
>>>> In our case it is very important that we do not undercount a window, one 
>>>> reason we use flink and it's awesome failure guarantees, as various alarms 
>>>> sound ( we do anomaly detection on the time series ).
>>>> 
>>>> Please create a jira ticket for us to follow or we could do it.
>>>> 
>>>> 
>>>> PS Not aborting on checkpointing, till a configurable limit is very 
>>>> important too.
>>>> 
>>>> 
>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org 
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> Hi Vishal,
>>>> 
>>>> I think you're right! And thanks for looking into this so deeply. 
>>>> 
>>>> With your last mail your basically saying, that the checkpoint could not 
>>>> be restored because your HDFS was temporarily down. If Flink had not 
>>>> deleted that checkpoint it might have been possible to restore it at a 
>>>> later point, right?
>>>> 
>>>> Regarding failed checkpoints killing the job: yes, this is currently the 
>>>> expected behaviour but there are plans to change this.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com 
>>>>> <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> 
>>>>> I think this is the offending piece. There is a catch all Exception, 
>>>>> which IMHO should understand a recoverable exception from an 
>>>>> unrecoverable on. 
>>>>> 
>>>>> 
>>>>>                   try {
>>>>>                           completedCheckpoint = 
>>>>> retrieveCompletedCheckpoint(checkpointStateHandle);
>>>>>                           if (completedCheckpoint != null) {
>>>>>                                   
>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>                           }
>>>>>                   } catch (Exception e) {
>>>>>                           LOG.warn("Could not retrieve checkpoint. 
>>>>> Removing it from the completed " +
>>>>>                                   "checkpoint store.", e);
>>>>> 
>>>>>                           // remove the checkpoint with broken state 
>>>>> handle
>>>>>                           
>>>>> removeBrokenStateHandle(checkpointStateHandle.f1, 
>>>>> checkpointStateHandle.f0);
>>>>>                   }
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi 
>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> So this is the issue and tell us that it is wrong. ZK had some state ( 
>>>>> backed by hdfs )  that referred to a checkpoint ( the same exact last 
>>>>> successful checkpoint that was successful before NN screwed us ). When 
>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>> retrieve the CHK handle from hdfs  and conveniently ( and I think very  
>>>>> wrongly ) removed the CHK from being considered and cleaned the pointer ( 
>>>>> though failed as was NN was down and is obvious from the dangling file in 
>>>>> recovery ) . The metadata itself was on hdfs and failure in retrieving 
>>>>> should have been a stop all, not going to trying doing magic exception 
>>>>> rather than starting from a blank state.
>>>>> 
>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint 44286 
>>>>> from state handle under /0000000000000044286. This indicates that the 
>>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi 
>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs 
>>>>> cluster ) also showed the same behavior. It had the pointers to the chk 
>>>>> point  ( I  think that is what it does, keeps metadata of where the 
>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the 
>>>>> failed state.
>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55 
>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>> 
>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07 
>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>> 
>>>>> This is getting a little interesting. What say you :)
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi 
>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> Another thing I noted was this thing
>>>>> 
>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54 
>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>> 
>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15 
>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>> 
>>>>> 
>>>>> 
>>>>> Generally what Flink does IMHO is that it replaces the chk point 
>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>> replaces the old directory.  In this job's case however, it did not 
>>>>> delete the 2017-10-04 13:54  and hence the chk-44286 directory.  This was 
>>>>> the last chk-44286 (  I think  )  successfully created before NN had 
>>>>> issues but as is usual did not delete this  chk-44286. It looks as if it 
>>>>> started with a blank slate ???????? Does this strike a chord ?????
>>>>> 
>>>>> 
>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi 
>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> Hello Fabian, 
>>>>>                       First of all congratulations on this fabulous 
>>>>> framework. I have worked with GDF and though GDF has some natural pluses 
>>>>> Flink's state management is far more advanced. With kafka as a source it 
>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and that 
>>>>> is to be expected but non FIFO pub/sub is an issue with windows on event 
>>>>> time etc )
>>>>> 
>>>>>                    Coming back to this issue. We have that same kafka 
>>>>> topic feeding a streaming druid datasource and we do not see any issue 
>>>>> there, so so data loss on the source, kafka is not applicable. I am 
>>>>> totally certain that the "retention" time was not an issue. It is 4 days 
>>>>> of retention and we fixed this issue within 30 minutes. We could replay 
>>>>> kafka with a new consumer group.id <http://group.id/> and that worked 
>>>>> fine. 
>>>>> 
>>>>> 
>>>>> Note these properties and see if they strike a chord.
>>>>> 
>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the 
>>>>> default true. I bring this up to see whether flink will in any 
>>>>> circumstance drive consumption on the kafka perceived offset rather than 
>>>>> the one in the checkpoint.
>>>>> 
>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The state 
>>>>> is big enough though therefore IMHO no way the state is stored along with 
>>>>> the meta data in JM ( or ZK ? ) . The reason I bring this up is to make 
>>>>> sure when you say that the size has to be less than 1024bytes , you are 
>>>>> talking about cumulative state of the pipeine.
>>>>> 
>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and 
>>>>> certainly understand that they actually are not dissimilar. However in 
>>>>> this case there were multiple attempts to restart the pipe before it 
>>>>> finally succeeded. 
>>>>> 
>>>>> * Other hdfs related poperties.
>>>>>  
>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints <>/<%= 
>>>>> flink_hdfs_root %>
>>>>>  state.savepoints.dir: hdfs:///flink-savepoints <>/<%= flink_hdfs_root %>
>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery <>/<%= 
>>>>> flink_hdfs_root %>
>>>>> 
>>>>> 
>>>>> Do these make sense ? Is there anything else I should look at.  Please 
>>>>> also note that it is the second time this has happened. The first time I 
>>>>> was vacationing and was not privy to the state of the flink pipeline, but 
>>>>> the net effect were similar. The counts for the first window after an 
>>>>> internal restart dropped. 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Thank you for you patience and regards,
>>>>> 
>>>>> Vishal
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com 
>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>> Hi Vishal,
>>>>> 
>>>>> window operators are always stateful because the operator needs to 
>>>>> remember previously received events (WindowFunction) or intermediate 
>>>>> results (ReduceFunction).
>>>>> Given the program you described, a checkpoint should include the Kafka 
>>>>> consumer offset and the state of the window operator. If the program 
>>>>> eventually successfully (i.e., without an error) recovered from the last 
>>>>> checkpoint, all its state should have been restored. Since the last 
>>>>> checkpoint was before HDFS went into safe mode, the program would have 
>>>>> been reset to that point. If the Kafka retention time is less than the 
>>>>> time it took to fix HDFS you would have lost data because it would have 
>>>>> been removed from Kafka. If that's not the case, we need to investigate 
>>>>> this further because a checkpoint recovery must not result in state loss.
>>>>> 
>>>>> Restoring from a savepoint is not so much different from automatic 
>>>>> checkpoint recovery. Given that you have a completed savepoint, you can 
>>>>> restart the job from that point. The main difference is that checkpoints 
>>>>> are only used for internal recovery and usually discarded once the job is 
>>>>> terminated while savepoints are retained. 
>>>>> 
>>>>> Regarding your question if a failed checkpoint should cause the job to 
>>>>> fail and recover I'm not sure what the current status is.
>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>> 
>>>>> Best, Fabian
>>>>> 
>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com 
>>>>> <mailto:vishal.santo...@gmail.com>>:
>>>>> To add to it, my pipeline is a simple 
>>>>> 
>>>>> keyBy(0)
>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>> 
>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi 
>>>>> <vishal.santo...@gmail.com <mailto:vishal.santo...@gmail.com>> wrote:
>>>>> Hello folks,
>>>>> 
>>>>> As far as I know checkpoint failure should be ignored and retried with 
>>>>> potentially larger state. I had this situation
>>>>> 
>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>> * exception was thrown
>>>>> 
>>>>>     
>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>  Operation category WRITE is not supported in state standby. Visit 
>>>>> https://s.apache.org/sbnn-error <https://s.apache.org/sbnn-error>
>>>>>     ..................
>>>>> 
>>>>>     at 
>>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>         at 
>>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>         at 
>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
>>>>> 
>>>>> * The pipeline came back after a few restarts and checkpoint failures, 
>>>>> after the hdfs issues were resolved.
>>>>> 
>>>>> I would not have worried about the restart, but it was evident that I 
>>>>> lost my operator state. Either it was my kafka consumer that kept on 
>>>>> advancing it's offset between a start and the next checkpoint failure ( a 
>>>>> minute's worth ) or the the operator that had partial aggregates was 
>>>>> lost. I have a 15 minute window of counts on a keyed operator
>>>>> 
>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>> 
>>>>> The questions thus are
>>>>> 
>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>> * Why on restart did the operator state did not recreate ?
>>>>> * Is the nature of the exception thrown have to do with any of this b'coz 
>>>>> suspend and resume from a save point work as expected ?
>>>>> * And though I am pretty sure, are operators like the Window operator 
>>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size, 
>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), 
>>>>> the state is managed by flink ?
>>>>> 
>>>>> Thanks.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
>> 
> 

Reply via email to