Thank you for considering this. If I understand you correctly.

* CHK pointer on ZK for a CHK state on hdfs was done successfully.
* Some issue restarted the pipeline.
* The NN was down unfortunately and flink could not retrieve the  CHK state
from the CHK pointer on ZK.

Before

* The CHK pointer was being removed and the job started from a brand new
slate.

After ( this fix on 1.4 +)

* do not delete the CHK pointer ( It has to be subsumed to be deleted ).
* Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any
retry limit ) to restore state
* NN comes back
* Flink restores state on the next retry.

I would hope that is the sequence to follow.

Regards.








On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Vishal,
>
> I think you might be right. We fixed the problem that checkpoints where
> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However, we
> still have the problem that if the DFS is not up at all then it will look
> as if the job is starting from scratch. However, the alternative is failing
> the job, in which case you will also never be able to restore from a
> checkpoint. What do you think?
>
> Best,
> Aljoscha
>
>
> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Sorry for the late reply.
>
> I created FLINK-8487 [1] to track this problem
>
> @Vishal, can you have a look and check if if forgot some details? I logged
> the issue for Flink 1.3.2, is that correct?
> Please add more information if you think it is relevant.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8487
>
> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> Or this one
>>
>> https://issues.apache.org/jira/browse/FLINK-4815
>>
>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> ping.
>>>
>>>     This happened again on production and it seems reasonable to abort
>>> when a checkpoint is not found rather than behave as if it is a brand new
>>> pipeline.
>>>
>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Folks sorry for being late on this. Can some body with the knowledge of
>>>> this code base create a jira issue for the above ? We have seen this more
>>>> than once on production.
>>>>
>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> Some relevant Jira issues for you are:
>>>>>
>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>>> failed checkpoints
>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>> priority.
>>>>> Please add all relevant information of your detailed analysis, add a
>>>>> link to this email thread (see [1] for the web archive of the mailing
>>>>> list), and post the id of the JIRA issue here.
>>>>>
>>>>> Thanks for looking into this!
>>>>>
>>>>> Best regards,
>>>>> Fabian
>>>>>
>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>
>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> :
>>>>>
>>>>>> Thank you for confirming.
>>>>>>
>>>>>>
>>>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>>> becomes all the more painful with your confirming that  "failed
>>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>>> store in unavailable  during checkpoint than you have lost state ( till 
>>>>>> of
>>>>>> course you have a retry of none or an unbounded retry delay, a delay that
>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>> failure  will cause new state according the code as written iff the 
>>>>>> remote
>>>>>> store is down. We would rather have a configurable property that
>>>>>> establishes  our desire to abort something like a
>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>
>>>>>>
>>>>>> In our case it is very important that we do not undercount a window,
>>>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>>
>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>
>>>>>>
>>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>>> important too.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>>
>>>>>>> With your last mail your basically saying, that the checkpoint could
>>>>>>> not be restored because your HDFS was temporarily down. If Flink had not
>>>>>>> deleted that checkpoint it might have been possible to restore it at a
>>>>>>> later point, right?
>>>>>>>
>>>>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>>>>> the expected behaviour but there are plans to change this.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>>>> which IMHO should understand a recoverable exception from an 
>>>>>>> unrecoverable
>>>>>>> on.
>>>>>>>
>>>>>>>
>>>>>>> try {
>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>> eckpointStateHandle);
>>>>>>> if (completedCheckpoint != null) {
>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>> }
>>>>>>> } catch (Exception e) {
>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>> completed " +
>>>>>>> "checkpoint store.", e);
>>>>>>> // remove the checkpoint with broken state handle
>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>> checkpointStateHandle.f0);
>>>>>>> }
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some
>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same exact
>>>>>>>> last successful checkpoint that was successful before NN screwed us ). 
>>>>>>>> When
>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>>>>> retrieve
>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly )
>>>>>>>> removed the CHK from being considered and cleaned the pointer ( though
>>>>>>>> failed as was NN was down and is obvious from the dangling file in 
>>>>>>>> recovery
>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should 
>>>>>>>> have
>>>>>>>> been a stop all, not going to trying doing magic exception rather than
>>>>>>>> starting from a blank state.
>>>>>>>>
>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>>>>> 44286 from state handle under /0000000000000044286. This indicates 
>>>>>>>> that the
>>>>>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same
>>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to 
>>>>>>>>> the
>>>>>>>>> chk point  ( I  think that is what it does, keeps metadata of where 
>>>>>>>>> the
>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>>>>>>>>> failed state.
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>
>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>
>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>> -44286
>>>>>>>>>>
>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>> -45428
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>>>>>> replaces
>>>>>>>>>> the old directory.  In this job's case however, it did not delete the
>>>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the 
>>>>>>>>>> last
>>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had issues 
>>>>>>>>>> but as
>>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it started 
>>>>>>>>>> with a
>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some 
>>>>>>>>>>> natural
>>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka as 
>>>>>>>>>>> a
>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is 
>>>>>>>>>>> organic
>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with 
>>>>>>>>>>> windows on
>>>>>>>>>>> event time etc )
>>>>>>>>>>>
>>>>>>>>>>>                    Coming back to this issue. We have that same
>>>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see 
>>>>>>>>>>> any
>>>>>>>>>>> issue there, so so data loss on the source, kafka is not 
>>>>>>>>>>> applicable. I am
>>>>>>>>>>> totally certain that the "retention" time was not an issue. It
>>>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. 
>>>>>>>>>>> We could
>>>>>>>>>>> replay kafka with a new consumer group.id and that worked fine.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>>
>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
>>>>>>>>>>> consumers is the default true. I bring this up to see whether flink 
>>>>>>>>>>> will in
>>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset 
>>>>>>>>>>> rather
>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>
>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.
>>>>>>>>>>> The state is big enough though therefore IMHO no way the state is 
>>>>>>>>>>> stored
>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring 
>>>>>>>>>>> this up is
>>>>>>>>>>> to make sure when you say that the size has to be less than 
>>>>>>>>>>> 1024bytes , you
>>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>>
>>>>>>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint
>>>>>>>>>>> ) and certainly understand that they actually are not dissimilar. 
>>>>>>>>>>> However
>>>>>>>>>>> in this case there were multiple attempts to restart the pipe 
>>>>>>>>>>> before it
>>>>>>>>>>> finally succeeded.
>>>>>>>>>>>
>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>
>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>
>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root 
>>>>>>>>>>> %>
>>>>>>>>>>>
>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>>>>>>> Please also note that it is the second time this has happened. The 
>>>>>>>>>>> first
>>>>>>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the first 
>>>>>>>>>>> window
>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>
>>>>>>>>>>> Vishal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>
>>>>>>>>>>>> window operators are always stateful because the operator needs
>>>>>>>>>>>> to remember previously received events (WindowFunction) or 
>>>>>>>>>>>> intermediate
>>>>>>>>>>>> results (ReduceFunction).
>>>>>>>>>>>> Given the program you described, a checkpoint should include
>>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. If 
>>>>>>>>>>>> the
>>>>>>>>>>>> program eventually successfully (i.e., without an error) recovered 
>>>>>>>>>>>> from the
>>>>>>>>>>>> last checkpoint, all its state should have been restored. Since 
>>>>>>>>>>>> the last
>>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would 
>>>>>>>>>>>> have been
>>>>>>>>>>>> reset to that point. If the Kafka retention time is less than the 
>>>>>>>>>>>> time it
>>>>>>>>>>>> took to fix HDFS you would have lost data because it would have 
>>>>>>>>>>>> been
>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate 
>>>>>>>>>>>> this
>>>>>>>>>>>> further because a checkpoint recovery must not result in state 
>>>>>>>>>>>> loss.
>>>>>>>>>>>>
>>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed 
>>>>>>>>>>>> savepoint,
>>>>>>>>>>>> you can restart the job from that point. The main difference is 
>>>>>>>>>>>> that
>>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>>> discarded once
>>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause the
>>>>>>>>>>>> job to fail and recover I'm not sure what the current status is.
>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>
>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is
>>>>>>>>>>>>>> not supported in state standby. Visit
>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java
>>>>>>>>>>>>>> :111)
>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would not have worried about the restart, but it was
>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka 
>>>>>>>>>>>>>> consumer that
>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next 
>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had partial
>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a 
>>>>>>>>>>>>>> keyed operator
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned
>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any
>>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as 
>>>>>>>>>>>>>> expected ?
>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to