Sorry for the late reply.

I created FLINK-8487 [1] to track this problem

@Vishal, can you have a look and check if if forgot some details? I logged
the issue for Flink 1.3.2, is that correct?
Please add more information if you think it is relevant.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8487

2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> Or this one
>
> https://issues.apache.org/jira/browse/FLINK-4815
>
> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> ping.
>>
>>     This happened again on production and it seems reasonable to abort
>> when a checkpoint is not found rather than behave as if it is a brand new
>> pipeline.
>>
>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Folks sorry for being late on this. Can some body with the knowledge of
>>> this code base create a jira issue for the above ? We have seen this more
>>> than once on production.
>>>
>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> Some relevant Jira issues for you are:
>>>>
>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>> failed checkpoints
>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>
>>>> Hi Vishal,
>>>>
>>>> it would be great if you could create a JIRA ticket with Blocker
>>>> priority.
>>>> Please add all relevant information of your detailed analysis, add a
>>>> link to this email thread (see [1] for the web archive of the mailing
>>>> list), and post the id of the JIRA issue here.
>>>>
>>>> Thanks for looking into this!
>>>>
>>>> Best regards,
>>>> Fabian
>>>>
>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>
>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> Thank you for confirming.
>>>>>
>>>>>
>>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>> becomes all the more painful with your confirming that  "failed
>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>> store in unavailable  during checkpoint than you have lost state ( till of
>>>>> course you have a retry of none or an unbounded retry delay, a delay that
>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>> failure  will cause new state according the code as written iff the remote
>>>>> store is down. We would rather have a configurable property that
>>>>> establishes  our desire to abort something like a
>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>
>>>>>
>>>>> In our case it is very important that we do not undercount a window,
>>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>
>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>
>>>>>
>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>> important too.
>>>>>
>>>>>
>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>
>>>>>> With your last mail your basically saying, that the checkpoint could
>>>>>> not be restored because your HDFS was temporarily down. If Flink had not
>>>>>> deleted that checkpoint it might have been possible to restore it at a
>>>>>> later point, right?
>>>>>>
>>>>>> Regarding failed checkpoints killing the job: yes, this is currently
>>>>>> the expected behaviour but there are plans to change this.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> I think this is the offending piece. There is a catch all Exception,
>>>>>> which IMHO should understand a recoverable exception from an 
>>>>>> unrecoverable
>>>>>> on.
>>>>>>
>>>>>>
>>>>>> try {
>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>> eckpointStateHandle);
>>>>>> if (completedCheckpoint != null) {
>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>> }
>>>>>> } catch (Exception e) {
>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>> completed " +
>>>>>> "checkpoint store.", e);
>>>>>> // remove the checkpoint with broken state handle
>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>> checkpointStateHandle.f0);
>>>>>> }
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> So this is the issue and tell us that it is wrong. ZK had some state
>>>>>>> ( backed by hdfs ) that referred to a checkpoint ( the same exact last
>>>>>>> successful checkpoint that was successful before NN screwed us ). When 
>>>>>>> the
>>>>>>> JM tried to recreate the state and b'coz NN was down failed to retrieve 
>>>>>>> the
>>>>>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) 
>>>>>>> removed
>>>>>>> the CHK from being considered and cleaned the pointer ( though failed as
>>>>>>> was NN was down and is obvious from the dangling file in recovery ) . 
>>>>>>> The
>>>>>>> metadata itself was on hdfs and failure in retrieving should have been a
>>>>>>> stop all, not going to trying doing magic exception rather than starting
>>>>>>> from a blank state.
>>>>>>>
>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>>>>>> 44286 from state handle under /0000000000000044286. This indicates that 
>>>>>>> the
>>>>>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same
>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to 
>>>>>>>> the
>>>>>>>> chk point  ( I  think that is what it does, keeps metadata of where the
>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>>>>>>>> failed state.
>>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>
>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>
>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>>>>>
>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>>>>> replaces
>>>>>>>>> the old directory.  In this job's case however, it did not delete the
>>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the 
>>>>>>>>> last
>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had issues 
>>>>>>>>> but as
>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it started 
>>>>>>>>> with a
>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Fabian,
>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some 
>>>>>>>>>> natural
>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka as a
>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is 
>>>>>>>>>> organic
>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with 
>>>>>>>>>> windows on
>>>>>>>>>> event time etc )
>>>>>>>>>>
>>>>>>>>>>                    Coming back to this issue. We have that same
>>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see 
>>>>>>>>>> any
>>>>>>>>>> issue there, so so data loss on the source, kafka is not applicable. 
>>>>>>>>>> I am
>>>>>>>>>> totally certain that the "retention" time was not an issue. It
>>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. We 
>>>>>>>>>> could
>>>>>>>>>> replay kafka with a new consumer group.id and that worked fine.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>
>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers
>>>>>>>>>> is the default true. I bring this up to see whether flink will in any
>>>>>>>>>> circumstance drive consumption on the kafka perceived offset rather 
>>>>>>>>>> than
>>>>>>>>>> the one in the checkpoint.
>>>>>>>>>>
>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.
>>>>>>>>>> The state is big enough though therefore IMHO no way the state is 
>>>>>>>>>> stored
>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring this 
>>>>>>>>>> up is
>>>>>>>>>> to make sure when you say that the size has to be less than 
>>>>>>>>>> 1024bytes , you
>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>
>>>>>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint
>>>>>>>>>> ) and certainly understand that they actually are not dissimilar. 
>>>>>>>>>> However
>>>>>>>>>> in this case there were multiple attempts to restart the pipe before 
>>>>>>>>>> it
>>>>>>>>>> finally succeeded.
>>>>>>>>>>
>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>
>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>
>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root 
>>>>>>>>>> %>
>>>>>>>>>>
>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>>>>>> Please also note that it is the second time this has happened. The 
>>>>>>>>>> first
>>>>>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>>>>>> pipeline, but the net effect were similar. The counts for the first 
>>>>>>>>>> window
>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>
>>>>>>>>>> Vishal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> window operators are always stateful because the operator needs
>>>>>>>>>>> to remember previously received events (WindowFunction) or 
>>>>>>>>>>> intermediate
>>>>>>>>>>> results (ReduceFunction).
>>>>>>>>>>> Given the program you described, a checkpoint should include the
>>>>>>>>>>> Kafka consumer offset and the state of the window operator. If the 
>>>>>>>>>>> program
>>>>>>>>>>> eventually successfully (i.e., without an error) recovered from the 
>>>>>>>>>>> last
>>>>>>>>>>> checkpoint, all its state should have been restored. Since the last
>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would 
>>>>>>>>>>> have been
>>>>>>>>>>> reset to that point. If the Kafka retention time is less than the 
>>>>>>>>>>> time it
>>>>>>>>>>> took to fix HDFS you would have lost data because it would have been
>>>>>>>>>>> removed from Kafka. If that's not the case, we need to investigate 
>>>>>>>>>>> this
>>>>>>>>>>> further because a checkpoint recovery must not result in state loss.
>>>>>>>>>>>
>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed 
>>>>>>>>>>> savepoint,
>>>>>>>>>>> you can restart the job from that point. The main difference is that
>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>> discarded once
>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>
>>>>>>>>>>> Regarding your question if a failed checkpoint should cause the
>>>>>>>>>>> job to fail and recover I'm not sure what the current status is.
>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>
>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>>>
>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>
>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is not
>>>>>>>>>>>>> supported in state standby. Visit https://s.apache.org/sbn
>>>>>>>>>>>>> n-error
>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>
>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java
>>>>>>>>>>>>> :111)
>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>
>>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would not have worried about the restart, but it was evident
>>>>>>>>>>>>> that I lost my operator state. Either it was my kafka consumer 
>>>>>>>>>>>>> that kept on
>>>>>>>>>>>>> advancing it's offset between a start and the next checkpoint 
>>>>>>>>>>>>> failure ( a
>>>>>>>>>>>>> minute's worth ) or the the operator that had partial aggregates 
>>>>>>>>>>>>> was lost.
>>>>>>>>>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>
>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any of
>>>>>>>>>>>>> this b'coz suspend and resume from a save point work as expected ?
>>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to