Re: Stopping of a streaming job empties state store on HDFS

2018-06-15 Thread Till Rohrmann
Hi Peter,

this sounds very strange. I just tried to reproduce the issue locally but
for me it worked without a problem. Could you maybe share the jobmanager
logs on DEBUG log level with us?

As a side note, enabling the asynchronous checkpointing mode for the
FsStateBackend does not have an effect on the RocksDBStateBackend. You
should rather call `new RocksDBStateBackend(new
FsStateBackend(stateStoreLocation), true)` if you want to enable
asynchronous checkpointing.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:57 AM Peter Zende  wrote:

> Hi Stefan,
>
> Thanks for the answer.
> Fixing the uids solved the problem, that's not an issue anymore.
> The savepoint directory is there, but the RocksDB state is not restored
> after restarting the application because
> that state directory has been removed when I stopped the application. It
> looks like that the savepoints itself don't
> contain the rocksdb state files. What we have is:
> env.setStateBackend(new RocksDBStateBackend(new
> FsStateBackend(stateStoreLocation, true)))
> -> this location got emptied.
> In the meanwhile we switched to cancelWithSavepoint which doesn't have
> this behavior and it works fine however the YARN application status
> results in FAILED instead of SUCCEED what we had in case of stopping.
>
> What are the uses cases of stopping? We implemented it because we wanted
> to ensure that the application shuts down correctly and
> we don't end in incosistent/broken state..
>
> Thanks,
> Peter
>
>
> 2018-06-11 11:31 GMT+02:00 Stefan Richter :
>
>> Hi,
>>
>> > Am 08.06.2018 um 01:16 schrieb Peter Zende :
>> >
>> > Hi all,
>> >
>> > We have a streaming pipeline (Flink 1.4.2) for which we implemented
>> stoppable sources to be able to  gracefully exit from the job with Yarn
>> state "finished/succeeded".
>> > This works fine, however after creating a savepoint, stopping the job
>> (stop event) and restarting it we remarked that the RocksDB state hasn't
>> been recovered. It looks like that it's because the state directory on HDFS
>> was emptied after issueing a stop event. This isn't the case when we cancel
>> the job, but we'd like to distinguish between job failures and stop events.
>> After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
>> not clear why this is the intended behavior.
>> > Should we use cancel instead?
>>
>> Savepoints should _not_ be cleaned up in case of stop or cancellation,
>> checkpoints should be cleaned up. Where are you storing the created
>> savepoints? They should not go into the checkpoint directory. Stop is
>> intended to be a more „graceful“ variant of cancel, but I think it is
>> rarely used with Flink. I would prefer cancel except if you really require
>> to use stoppable for some particular reason.
>>
>> > When we backup the local state directory, stop the job, copy back the
>> directory and start a new job from the savepoint then it works fine.
>> > Another issue is that when we restart the job with different source
>> (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the
>> recovery from savepoint doesn't fail but the local state isn't restored. Is
>> there any trick besides setting allowNonRestoredState?
>>
>>
>> I need to clarify here, when you say „each having uids set“, do you set
>> the same uids for both types of sources? The uid must match, because Flink
>> will reassign the state in a restore based on the uids, i.e. state x goes
>> to the operator with the same uid as the uid of the operator that created
>> it in the previous job. The flag allowNonRestoredState has the purpose to
>> tolerate that some state from a checkpoint/savepoint does not find a
>> matching operator to which it should be assigned (no operator with matching
>> uid exists in the jobgraph). For example, you want this if you removed
>> operators from the job.
>>
>> Best,
>> Stefan
>>
>>
>


Re: Stopping of a streaming job empties state store on HDFS

2018-06-15 Thread Peter Zende
Hi Stefan,

Thanks for the answer.
Fixing the uids solved the problem, that's not an issue anymore.
The savepoint directory is there, but the RocksDB state is not restored
after restarting the application because
that state directory has been removed when I stopped the application. It
looks like that the savepoints itself don't
contain the rocksdb state files. What we have is:
env.setStateBackend(new RocksDBStateBackend(new
FsStateBackend(stateStoreLocation, true)))
-> this location got emptied.
In the meanwhile we switched to cancelWithSavepoint which doesn't have this
behavior and it works fine however the YARN application status
results in FAILED instead of SUCCEED what we had in case of stopping.

What are the uses cases of stopping? We implemented it because we wanted to
ensure that the application shuts down correctly and
we don't end in incosistent/broken state..

Thanks,
Peter


2018-06-11 11:31 GMT+02:00 Stefan Richter :

> Hi,
>
> > Am 08.06.2018 um 01:16 schrieb Peter Zende :
> >
> > Hi all,
> >
> > We have a streaming pipeline (Flink 1.4.2) for which we implemented
> stoppable sources to be able to  gracefully exit from the job with Yarn
> state "finished/succeeded".
> > This works fine, however after creating a savepoint, stopping the job
> (stop event) and restarting it we remarked that the RocksDB state hasn't
> been recovered. It looks like that it's because the state directory on HDFS
> was emptied after issueing a stop event. This isn't the case when we cancel
> the job, but we'd like to distinguish between job failures and stop events.
> After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
> not clear why this is the intended behavior.
> > Should we use cancel instead?
>
> Savepoints should _not_ be cleaned up in case of stop or cancellation,
> checkpoints should be cleaned up. Where are you storing the created
> savepoints? They should not go into the checkpoint directory. Stop is
> intended to be a more „graceful“ variant of cancel, but I think it is
> rarely used with Flink. I would prefer cancel except if you really require
> to use stoppable for some particular reason.
>
> > When we backup the local state directory, stop the job, copy back the
> directory and start a new job from the savepoint then it works fine.
> > Another issue is that when we restart the job with different source (1st
> job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery
> from savepoint doesn't fail but the local state isn't restored. Is there
> any trick besides setting allowNonRestoredState?
>
>
> I need to clarify here, when you say „each having uids set“, do you set
> the same uids for both types of sources? The uid must match, because Flink
> will reassign the state in a restore based on the uids, i.e. state x goes
> to the operator with the same uid as the uid of the operator that created
> it in the previous job. The flag allowNonRestoredState has the purpose to
> tolerate that some state from a checkpoint/savepoint does not find a
> matching operator to which it should be assigned (no operator with matching
> uid exists in the jobgraph). For example, you want this if you removed
> operators from the job.
>
> Best,
> Stefan
>
>


Re: Stopping of a streaming job empties state store on HDFS

2018-06-11 Thread Stefan Richter
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende :
> 
> Hi all,
> 
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable 
> sources to be able to  gracefully exit from the job with Yarn state 
> "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop 
> event) and restarting it we remarked that the RocksDB state hasn't been 
> recovered. It looks like that it's because the state directory on HDFS was 
> emptied after issueing a stop event. This isn't the case when we cancel the 
> job, but we'd like to distinguish between job failures and stop events. After 
> reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not 
> clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, 
checkpoints should be cleaned up. Where are you storing the created savepoints? 
They should not go into the checkpoint directory. Stop is intended to be a more 
„graceful“ variant of cancel, but I think it is rarely used with Flink. I would 
prefer cancel except if you really require to use stoppable for some particular 
reason.

> When we backup the local state directory, stop the job, copy back the 
> directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: 
> HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from 
> savepoint doesn't fail but the local state isn't restored. Is there any trick 
> besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the 
same uids for both types of sources? The uid must match, because Flink will 
reassign the state in a restore based on the uids, i.e. state x goes to the 
operator with the same uid as the uid of the operator that created it in the 
previous job. The flag allowNonRestoredState has the purpose to tolerate that 
some state from a checkpoint/savepoint does not find a matching operator to 
which it should be assigned (no operator with matching uid exists in the 
jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan



Stopping of a streaming job empties state store on HDFS

2018-06-07 Thread Peter Zende
Hi all,

We have a streaming pipeline (Flink 1.4.2) for which we implemented
stoppable sources to be able to  gracefully exit from the job with Yarn
state "finished/succeeded".
This works fine, however after creating a savepoint, stopping the job (stop
event) and restarting it we remarked that the RocksDB state hasn't been
recovered. It looks like that it's because the state directory on HDFS was
emptied after issueing a stop event. This isn't the case when we cancel the
job, but we'd like to distinguish between job failures and stop events.
After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still
not clear why this is the intended behavior.
Should we use cancel instead?

When we backup the local state directory, stop the job, copy back the
directory and start a new job from the savepoint then it works fine.
Another issue is that when we restart the job with different source (1st
job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery
from savepoint doesn't fail but the local state isn't restored. Is there
any trick besides setting allowNonRestoredState?

Many thanks,
Peter