Hi Vishal,

it would be great if you could create a JIRA ticket with Blocker priority.
Please add all relevant information of your detailed analysis, add a link
to this email thread (see [1] for the web archive of the mailing list), and
post the id of the JIRA issue here.

Thanks for looking into this!

Best regards,
Fabian

[1] https://lists.apache.org/list.html?user@flink.apache.org

2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> Thank you for confirming.
>
>
>  I think this is a critical bug. In essence any checkpoint store (
> hdfs/S3/File)  will loose state if it is unavailable at resume. This
> becomes all the more painful with your confirming that  "failed
> checkpoints killing the job"  b'coz essentially it mean that if remote
> store in unavailable  during checkpoint than you have lost state ( till of
> course you have a retry of none or an unbounded retry delay, a delay that
> you *hope* the store revives in ) .. Remember  the first retry failure
>  will cause new state according the code as written iff the remote store is
> down. We would rather have a configurable property that establishes  our
> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>
>
> In our case it is very important that we do not undercount a window, one
> reason we use flink and it's awesome failure guarantees, as various alarms
> sound ( we do anomaly detection on the time series ).
>
> Please create a jira ticket for us to follow or we could do it.
>
>
> PS Not aborting on checkpointing, till a configurable limit is very
> important too.
>
>
> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Vishal,
>>
>> I think you're right! And thanks for looking into this so deeply.
>>
>> With your last mail your basically saying, that the checkpoint could not
>> be restored because your HDFS was temporarily down. If Flink had not
>> deleted that checkpoint it might have been possible to restore it at a
>> later point, right?
>>
>> Regarding failed checkpoints killing the job: yes, this is currently the
>> expected behaviour but there are plans to change this.
>>
>> Best,
>> Aljoscha
>>
>> On 5. Oct 2017, at 17:40, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>>
>> I think this is the offending piece. There is a catch all Exception,
>> which IMHO should understand a recoverable exception from an unrecoverable
>> on.
>>
>>
>> try {
>> completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
>> if (completedCheckpoint != null) {
>> completedCheckpoints.add(completedCheckpoint);
>> }
>> } catch (Exception e) {
>> LOG.warn("Could not retrieve checkpoint. Removing it from the completed "
>> +
>> "checkpoint store.", e);
>> // remove the checkpoint with broken state handle
>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.
>> f0);
>> }
>>
>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> So this is the issue and tell us that it is wrong. ZK had some state (
>>> backed by hdfs ) that referred to a checkpoint ( the same exact last
>>> successful checkpoint that was successful before NN screwed us ). When the
>>> JM tried to recreate the state and b'coz NN was down failed to retrieve the
>>> CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
>>> the CHK from being considered and cleaned the pointer ( though failed as
>>> was NN was down and is obvious from the dangling file in recovery ) . The
>>> metadata itself was on hdfs and failure in retrieving should have been a
>>> stop all, not going to trying doing magic exception rather than starting
>>> from a blank state.
>>>
>>> org.apache.flink.util.FlinkException: Could not retrieve checkpoint
>>> 44286 from state handle under /0000000000000044286. This indicates that the
>>> retrieved state handle is broken. Try cleaning the state handle store.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Also note that  the zookeeper recovery did  ( sadly on the same hdfs
>>>> cluster ) also showed the same behavior. It had the pointers to the chk
>>>> point  ( I  think that is what it does, keeps metadata of where the
>>>> checkpoint etc  ) .  It too decided to keep the recovery file from the
>>>> failed state.
>>>>
>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>
>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>
>>>> This is getting a little interesting. What say you :)
>>>>
>>>>
>>>>
>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Another thing I noted was this thing
>>>>>
>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>>>>>
>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>>>>>
>>>>>
>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>> directory with a new one. I see it happening now. Every minute it replaces
>>>>> the old directory.  In this job's case however, it did not delete the
>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the last
>>>>> chk-44286 (  I think  )  successfully created before NN had issues but as
>>>>> is usual did not delete this  chk-44286. It looks as if it started with a
>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>
>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Hello Fabian,
>>>>>>                       First of all congratulations on this fabulous
>>>>>> framework. I have worked with GDF and though GDF has some natural pluses
>>>>>> Flink's state management is far more advanced. With kafka as a source it
>>>>>> negates issues GDF has ( GDF integration with pub/sub is organic and that
>>>>>> is to be expected but non FIFO pub/sub is an issue with windows on event
>>>>>> time etc )
>>>>>>
>>>>>>                    Coming back to this issue. We have that same kafka
>>>>>> topic feeding a streaming druid datasource and we do not see any issue
>>>>>> there, so so data loss on the source, kafka is not applicable. I am 
>>>>>> totally
>>>>>> certain that the "retention" time was not an issue. It is 4 days of
>>>>>> retention and we fixed this issue within 30 minutes. We could replay 
>>>>>> kafka
>>>>>> with a new consumer group.id and that worked fine.
>>>>>>
>>>>>>
>>>>>> Note these properties and see if they strike a chord.
>>>>>>
>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is
>>>>>> the default true. I bring this up to see whether flink will in any
>>>>>> circumstance drive consumption on the kafka perceived offset rather than
>>>>>> the one in the checkpoint.
>>>>>>
>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.  The
>>>>>> state is big enough though therefore IMHO no way the state is stored 
>>>>>> along
>>>>>> with the meta data in JM ( or ZK ? ) . The reason I bring this up is to
>>>>>> make sure when you say that the size has to be less than 1024bytes , you
>>>>>> are talking about cumulative state of the pipeine.
>>>>>>
>>>>>> * We have a good sense of SP ( save point )  and CP ( checkpoint )
>>>>>> and certainly understand that they actually are not dissimilar. However 
>>>>>> in
>>>>>> this case there were multiple attempts to restart the pipe before it
>>>>>> finally succeeded.
>>>>>>
>>>>>> * Other hdfs related poperties.
>>>>>>
>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>> flink_hdfs_root %>
>>>>>>
>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>
>>>>>>
>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>> flink_hdfs_root %>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>> Please also note that it is the second time this has happened. The first
>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>> pipeline, but the net effect were similar. The counts for the first 
>>>>>> window
>>>>>> after an internal restart dropped.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thank you for you patience and regards,
>>>>>>
>>>>>> Vishal
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> window operators are always stateful because the operator needs to
>>>>>>> remember previously received events (WindowFunction) or intermediate
>>>>>>> results (ReduceFunction).
>>>>>>> Given the program you described, a checkpoint should include the
>>>>>>> Kafka consumer offset and the state of the window operator. If the 
>>>>>>> program
>>>>>>> eventually successfully (i.e., without an error) recovered from the last
>>>>>>> checkpoint, all its state should have been restored. Since the last
>>>>>>> checkpoint was before HDFS went into safe mode, the program would have 
>>>>>>> been
>>>>>>> reset to that point. If the Kafka retention time is less than the time 
>>>>>>> it
>>>>>>> took to fix HDFS you would have lost data because it would have been
>>>>>>> removed from Kafka. If that's not the case, we need to investigate this
>>>>>>> further because a checkpoint recovery must not result in state loss.
>>>>>>>
>>>>>>> Restoring from a savepoint is not so much different from automatic
>>>>>>> checkpoint recovery. Given that you have a completed savepoint, you can
>>>>>>> restart the job from that point. The main difference is that checkpoints
>>>>>>> are only used for internal recovery and usually discarded once the job 
>>>>>>> is
>>>>>>> terminated while savepoints are retained.
>>>>>>>
>>>>>>> Regarding your question if a failed checkpoint should cause the job
>>>>>>> to fail and recover I'm not sure what the current status is.
>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>>> >:
>>>>>>>
>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>
>>>>>>>> keyBy(0)
>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello folks,
>>>>>>>>>
>>>>>>>>> As far as I know checkpoint failure should be ignored and retried
>>>>>>>>> with potentially larger state. I had this situation
>>>>>>>>>
>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>> * exception was thrown
>>>>>>>>>
>>>>>>>>>     
>>>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>     ..................
>>>>>>>>>
>>>>>>>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(
>>>>>>>>> HadoopFileSystem.java:453)
>>>>>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.
>>>>>>>>> mkdirs(SafetyNetWrapperFileSystem.java:111)
>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>
>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>
>>>>>>>>> I would not have worried about the restart, but it was evident
>>>>>>>>> that I lost my operator state. Either it was my kafka consumer that 
>>>>>>>>> kept on
>>>>>>>>> advancing it's offset between a start and the next checkpoint failure 
>>>>>>>>> ( a
>>>>>>>>> minute's worth ) or the the operator that had partial aggregates was 
>>>>>>>>> lost.
>>>>>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>>>>>
>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>>>>>
>>>>>>>>> The questions thus are
>>>>>>>>>
>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>> * Is the nature of the exception thrown have to do with any of
>>>>>>>>> this b'coz suspend and resume from a save point work as expected ?
>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to