Also note that  the zookeeper recovery did  ( sadly on the same hdfs
cluster ) also showed the same behavior. It had the pointers to the chk
point  ( I  think that is what it does, keeps metadata of where the
checkpoint etc  ) .  It too decided to keep the recovery file from the
failed state.

-rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
/flink-recovery/prod/completedCheckpoint6c9096bb9ed4

-rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
/flink-recovery/prod/completedCheckpoint7c5a19300092

This is getting a little interesting. What say you :)



On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Another thing I noted was this thing
>
> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-44286
>
> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk-45428
>
>
> Generally what Flink does IMHO is that it replaces the chk point directory
> with a new one. I see it happening now. Every minute it replaces the old
> directory.  In this job's case however, it did not delete the 2017-10-04
> 13:54  and hence the chk-44286 directory.  This was the last chk-44286 (  I
> think  )  successfully created before NN had issues but as is usual did not
> delete this  chk-44286. It looks as if it started with a blank slate
> ???????? Does this strike a chord ?????
>
> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <vishal.santo...@gmail.com
> > wrote:
>
>> Hello Fabian,
>>                       First of all congratulations on this fabulous
>> framework. I have worked with GDF and though GDF has some natural pluses
>> Flink's state management is far more advanced. With kafka as a source it
>> negates issues GDF has ( GDF integration with pub/sub is organic and that
>> is to be expected but non FIFO pub/sub is an issue with windows on event
>> time etc )
>>
>>                    Coming back to this issue. We have that same kafka
>> topic feeding a streaming druid datasource and we do not see any issue
>> there, so so data loss on the source, kafka is not applicable. I am totally
>> certain that the "retention" time was not an issue. It is 4 days of
>> retention and we fixed this issue within 30 minutes. We could replay kafka
>> with a new consumer group.id and that worked fine.
>>
>>
>> Note these properties and see if they strike a chord.
>>
>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka consumers is the
>> default true. I bring this up to see whether flink will in any circumstance
>> drive consumption on the kafka perceived offset rather than the one in the
>> checkpoint.
>>
>> * The state.backend.fs.memory-threshold: 0 has not been set.  The state
>> is big enough though therefore IMHO no way the state is stored along with
>> the meta data in JM ( or ZK ? ) . The reason I bring this up is to make
>> sure when you say that the size has to be less than 1024bytes , you are
>> talking about cumulative state of the pipeine.
>>
>> * We have a good sense of SP ( save point )  and CP ( checkpoint ) and
>> certainly understand that they actually are not dissimilar. However in this
>> case there were multiple attempts to restart the pipe before it finally
>> succeeded.
>>
>> * Other hdfs related poperties.
>>
>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>> flink_hdfs_root %>
>>
>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= flink_hdfs_root %>
>>
>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= flink_hdfs_root %>
>>
>>
>>
>> Do these make sense ? Is there anything else I should look at.  Please
>> also note that it is the second time this has happened. The first time I
>> was vacationing and was not privy to the state of the flink pipeline, but
>> the net effect were similar. The counts for the first window after an
>> internal restart dropped.
>>
>>
>>
>>
>> Thank you for you patience and regards,
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> window operators are always stateful because the operator needs to
>>> remember previously received events (WindowFunction) or intermediate
>>> results (ReduceFunction).
>>> Given the program you described, a checkpoint should include the Kafka
>>> consumer offset and the state of the window operator. If the program
>>> eventually successfully (i.e., without an error) recovered from the last
>>> checkpoint, all its state should have been restored. Since the last
>>> checkpoint was before HDFS went into safe mode, the program would have been
>>> reset to that point. If the Kafka retention time is less than the time it
>>> took to fix HDFS you would have lost data because it would have been
>>> removed from Kafka. If that's not the case, we need to investigate this
>>> further because a checkpoint recovery must not result in state loss.
>>>
>>> Restoring from a savepoint is not so much different from automatic
>>> checkpoint recovery. Given that you have a completed savepoint, you can
>>> restart the job from that point. The main difference is that checkpoints
>>> are only used for internal recovery and usually discarded once the job is
>>> terminated while savepoints are retained.
>>>
>>> Regarding your question if a failed checkpoint should cause the job to
>>> fail and recover I'm not sure what the current status is.
>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>
>>> Best, Fabian
>>>
>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> To add to it, my pipeline is a simple
>>>>
>>>> keyBy(0)
>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>
>>>>
>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Hello folks,
>>>>>
>>>>> As far as I know checkpoint failure should be ignored and retried with
>>>>> potentially larger state. I had this situation
>>>>>
>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>> * exception was thrown
>>>>>
>>>>>     
>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>> Operation category WRITE is not supported in state standby. Visit
>>>>> https://s.apache.org/sbnn-error
>>>>>     ..................
>>>>>
>>>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had
>>>>> oopFileSystem.java:453)
>>>>>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(
>>>>> SafetyNetWrapperFileSystem.java:111)
>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>> FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.
>>>>> java:132)
>>>>>
>>>>> * The pipeline came back after a few restarts and checkpoint failures,
>>>>> after the hdfs issues were resolved.
>>>>>
>>>>> I would not have worried about the restart, but it was evident that I
>>>>> lost my operator state. Either it was my kafka consumer that kept on
>>>>> advancing it's offset between a start and the next checkpoint failure ( a
>>>>> minute's worth ) or the the operator that had partial aggregates was lost.
>>>>> I have a 15 minute window of counts on a keyed operator
>>>>>
>>>>> I am using ROCKS DB and of course have checkpointing turned on.
>>>>>
>>>>> The questions thus are
>>>>>
>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>> * Why on restart did the operator state did not recreate ?
>>>>> * Is the nature of the exception thrown have to do with any of this
>>>>> b'coz suspend and resume from a save point work as expected ?
>>>>> * And though I am pretty sure, are operators like the Window operator
>>>>> stateful by drfault and thus if I have timeWindow(Time.of(window_size,
>>>>> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the
>>>>> state is managed by flink ?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to