Re: Fault tolerance in Flink file Sink

2020-04-27 Thread Kostas Kloudas
Hi Eyal and Dawid,

@Eyal I think Dawid explained pretty well what is happening and why in
distributed settings, the underlying FS on which the StreamingFileSink
writes has to be durable and accessible to all parallel instances of
the job. Please let us know if you have any further questions.

Cheers,
Kostas

On Mon, Apr 27, 2020 at 9:52 AM Eyal Pe'er  wrote:
>
> Hi Dawid,
> Thanks for the very detailed answer and the correct assumptions (I am using 
> row format).
>
> I tried not using NFS/S3, but seems like it is the only option I have.
>
> Best regards
>
> Eyal Peer
>
> From: Dawid Wysakowicz 
> Sent: Friday, April 24, 2020 4:20 PM
> To: Eyal Pe'er ; user 
> Subject: Re: Fault tolerance in Flink file Sink
>
>
>
> Hi Eyal,
>
> First of all I would say a local filesystem is not a right choice for what 
> you are trying to achieve. I don't think you can achive a true exactly once 
> policy in this setup. Let me elaborate why.
>
> Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
> how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
> As you have not said what format you use lets assume you use row format 
> first. For a row format the default rolling policy (when to change the file 
> from in-progress to pending) is it will be rolled if the file reaches 128MB, 
> the file is older than 60 sec or it has not been written to for 60 sec. It 
> does not roll on a checkpoint. Moreover StreamingFileSink considers the 
> filesystem as a durable sink that can be accessed after a restore. That 
> implies that it will try to append to this file when restoring from 
> checkpoint/savepoint.
>
> Even if you rolled the files on every checkpoint you still might face the 
> problem that you can have some leftovers because the StreamingFileSink moves 
> the files from pending to complete after the checkpoint is completed. If a 
> failure happens between finishing the checkpoint and moving the files it will 
> not be able to move them after a restore (it would do it if had an access).
>
> Lastly a completed checkpoint will contain offsets of records that were 
> processed successfully end-to-end, that means records that are assumed 
> committed by the StreamingFileSink. This can be records written to an 
> in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
> records in a "pending" file with an entry in a StreamingFileSink checkpointed 
> metadata that this file has been completed or records in "finished" files.[1]
>
> Therefore as you can see there are multiple scenarios when the 
> StreamingFileSink has to access the files after a restart.
>
> Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
> mind that Flink does not use offsets committed back to Kafka for guaranteeing 
> consistency. It can write those offsets back but only for 
> monitoring/debugging purposes. Flink stores/restores the processed offsets 
> from its checkpoints.[3]
>
> Let me know if it helped. I tried my best ;) BTW I highly encourage reading 
> the linked sources as they try to describe all that in a more structured way.
>
> I am also cc'ing Kostas who knows more about the StreamingFileSink than I 
> do., so he can maybe correct me somewhere.
>
>  Best,
>
> Dawid
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
>
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
> and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task 
> managers look for the leftovers files from the last failing job (hidden 
> files).
>
> Obviously, since the tasks can be assigned to different task managers, this 
> sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and resubmit 
> the job.
>
> If I get it right (and please correct me If I wrong), the events in the 
> hidden files were not committed to the bootstrap-server, so there is no data 
> loss.
>
>
>
> Is there a way, forcing Flink to ignore the files that were written already? 
> Or maybe there is a better way to implement the solution (maybe somehow with 
> savepoints)?
>
>
>
> Best regards
>
> Eyal Peer
>
>


RE: Fault tolerance in Flink file Sink

2020-04-27 Thread Eyal Pe'er
Hi Dawid,
Thanks for the very detailed answer and the correct assumptions (I am using row 
format).
I tried not using NFS/S3, but seems like it is the only option I have.
Best regards
Eyal Peer
From: Dawid Wysakowicz 
Sent: Friday, April 24, 2020 4:20 PM
To: Eyal Pe'er ; user 
Subject: Re: Fault tolerance in Flink file Sink


Hi Eyal,

First of all I would say a local filesystem is not a right choice for what you 
are trying to achieve. I don't think you can achive a true exactly once policy 
in this setup. Let me elaborate why.

Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
As you have not said what format you use lets assume you use row format first. 
For a row format the default rolling policy (when to change the file from 
in-progress to pending) is it will be rolled if the file reaches 128MB, the 
file is older than 60 sec or it has not been written to for 60 sec. It does not 
roll on a checkpoint. Moreover StreamingFileSink considers the filesystem as a 
durable sink that can be accessed after a restore. That implies that it will 
try to append to this file when restoring from checkpoint/savepoint.

Even if you rolled the files on every checkpoint you still might face the 
problem that you can have some leftovers because the StreamingFileSink moves 
the files from pending to complete after the checkpoint is completed. If a 
failure happens between finishing the checkpoint and moving the files it will 
not be able to move them after a restore (it would do it if had an access).

Lastly a completed checkpoint will contain offsets of records that were 
processed successfully end-to-end, that means records that are assumed 
committed by the StreamingFileSink. This can be records written to an 
in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
records in a "pending" file with an entry in a StreamingFileSink checkpointed 
metadata that this file has been completed or records in "finished" files.[1]

Therefore as you can see there are multiple scenarios when the 
StreamingFileSink has to access the files after a restart.

Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
mind that Flink does not use offsets committed back to Kafka for guaranteeing 
consistency. It can write those offsets back but only for monitoring/debugging 
purposes. Flink stores/restores the processed offsets from its checkpoints.[3]

Let me know if it helped. I tried my best ;) BTW I highly encourage reading the 
linked sources as they try to describe all that in a more structured way.

I am also cc'ing Kostas who knows more about the StreamingFileSink than I do., 
so he can maybe correct me somewhere.

 Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 23/04/2020 12:11, Eyal Pe'er wrote:
Hi all,
I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
The file sink writes the files to the local disk.
I've noticed that if a job fails and automatic restart is on, the task managers 
look for the leftovers files from the last failing job (hidden files).
Obviously, since the tasks can be assigned to different task managers, this 
sums up to more failures over and over again.
The only solution I found so far is to delete the hidden files and resubmit the 
job.
If I get it right (and please correct me If I wrong), the events in the hidden 
files were not committed to the bootstrap-server, so there is no data loss.

Is there a way, forcing Flink to ignore the files that were written already? Or 
maybe there is a better way to implement the solution (maybe somehow with 
savepoints)?

Best regards
Eyal Peer



Re: Fault tolerance in Flink file Sink

2020-04-24 Thread Dawid Wysakowicz
Forgot to cc Kostas

On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector
> (FlinkKafkaConsumer) and file Sink (StreamingFileSink) in a cluster
> mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task
> managers look for the leftovers files from the last failing job
> (hidden files).
>
> Obviously, since the tasks can be assigned to different task managers,
> this sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and
> resubmit the job.
>
> If I get it right (and please correct me If I wrong), the events in
> the hidden files were not committed to the bootstrap-server, so there
> is no data loss.
>
>  
>
> Is there a way, forcing Flink to ignore the files that were written
> already? Or maybe there is a better way to implement the solution
> (maybe somehow with savepoints)?
>
>  
>
> Best regards
>
> Eyal Peer
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: Fault tolerance in Flink file Sink

2020-04-24 Thread Dawid Wysakowicz
Hi Eyal,

First of all I would say a local filesystem is not a right choice for
what you are trying to achieve. I don't think you can achive a true
exactly once policy in this setup. Let me elaborate why.

Let me clarify a bit how the StreamingFileSink works.  The interesting
bit is how it behaves on checkpoints. The behavior is controlled by a
RollingPolicy. As you have not said what format you use lets assume you
use row format first. For a row format the default rolling policy (when
to change the file from in-progress to pending) is it will be rolled if
the file reaches 128MB, the file is older than 60 sec or it has not been
written to for 60 sec. It does not roll on a checkpoint. Moreover
StreamingFileSink considers the filesystem as a durable sink that can be
accessed after a restore. That implies that it will try to append to
this file when restoring from checkpoint/savepoint.

Even if you rolled the files on every checkpoint you still might face
the problem that you can have some leftovers because the
StreamingFileSink moves the files from pending to complete after the
checkpoint is completed. If a failure happens between finishing the
checkpoint and moving the files it will not be able to move them after a
restore (it would do it if had an access).

Lastly a completed checkpoint will contain offsets of records that were
processed successfully end-to-end, that means records that are assumed
committed by the StreamingFileSink. This can be records written to an
in-progress file with a pointer in a StreamingFileSink checkpointed
metadata, records in a "pending" file with an entry in a
StreamingFileSink checkpointed metadata that this file has been
completed or records in "finished" files.[1]

Therefore as you can see there are multiple scenarios when the
StreamingFileSink has to access the files after a restart.

Last last thing, you mentioned "committing to the "bootstrap-server".
Bear in mind that Flink does not use offsets committed back to Kafka for
guaranteeing consistency. It can write those offsets back but only for
monitoring/debugging purposes. Flink stores/restores the processed
offsets from its checkpoints.[3]

Let me know if it helped. I tried my best ;) BTW I highly encourage
reading the linked sources as they try to describe all that in a more
structured way.

I am also cc'ing Kostas who knows more about the StreamingFileSink than
I do., so he can maybe correct me somewhere.

 Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector
> (FlinkKafkaConsumer) and file Sink (StreamingFileSink) in a cluster
> mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task
> managers look for the leftovers files from the last failing job
> (hidden files).
>
> Obviously, since the tasks can be assigned to different task managers,
> this sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and
> resubmit the job.
>
> If I get it right (and please correct me If I wrong), the events in
> the hidden files were not committed to the bootstrap-server, so there
> is no data loss.
>
>  
>
> Is there a way, forcing Flink to ignore the files that were written
> already? Or maybe there is a better way to implement the solution
> (maybe somehow with savepoints)?
>
>  
>
> Best regards
>
> Eyal Peer
>
>  
>


signature.asc
Description: OpenPGP digital signature


Fault tolerance in Flink file Sink

2020-04-23 Thread Eyal Pe'er
Hi all,
I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
The file sink writes the files to the local disk.
I've noticed that if a job fails and automatic restart is on, the task managers 
look for the leftovers files from the last failing job (hidden files).
Obviously, since the tasks can be assigned to different task managers, this 
sums up to more failures over and over again.
The only solution I found so far is to delete the hidden files and resubmit the 
job.
If I get it right (and please correct me If I wrong), the events in the hidden 
files were not committed to the bootstrap-server, so there is no data loss.

Is there a way, forcing Flink to ignore the files that were written already? Or 
maybe there is a better way to implement the solution (maybe somehow with 
savepoints)?

Best regards
Eyal Peer