Hi,

> Can't `OnCheckpointRollingPolicy` meet your requirements?
OnCheckpointRollingPolicy does not allow users to define rollover thresholds 
such as inactivity time, window size.

However, that is beside the point of default behaviour, which I think should be 
thoughtfully considered for Flink users.

DefaultRollingPolicy not rolling inProgressPart files on checkpoints have 
ramifications:

1. Users cannot pick non-latest savepoints to restore from. This is even when 
they decided that replaying events is appropriate.
2. In S3 Implementation (I'm not sure if this affects others implementations), 
MultiPartUpload (MPU) can get expired. This leaves users a non-restorable 
savepoint, causing them to lose operator state.


> Yes, it's by design.
Please can you point me to what was behind the design decision? 

Thank you
Keith


On 08/03/2023, 06:51, "yuxia" <luoyu...@alumni.sjtu.edu.cn 
<mailto:luoyu...@alumni.sjtu.edu.cn>> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.

Yes, it's by design.


I don't think we should change DefaultRollingPolicy. Can't 
`OnCheckpointRollingPolicy` meet your requirements?


Best regards,
Yuxia


----- 原始邮件 -----
发件人: "Lee, Keith" <lee...@amazon.co.uk.inva 
<mailto:lee...@amazon.co.uk.inva>LID>
收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org>>
发送时间: 星期二, 2023年 3 月 07日 下午 10:39:44
主题: StreamingFileSink's DefaultRollingPolicy


Hi,


StreamingFileSink’s DefaultRollingPolicy does not always roll in progress parts 
to pending parts on checkpoints. Is this by design? It seems counter intuitive 
to me that part files are not finished on checkpoint by default.


https://github.com/ueshin/apache-flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java#L70-L72
 
<https://github.com/ueshin/apache-flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java#L70-L72>




1. This has impact on S3 implementation in that inProgressPart files are 
MultiPartUploads which can 
expire<https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html>
 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html&gt;>.
When the files do expire, jobs can no longer be started from savepoints as they 
will run into missing FileNotFoundException on the in progress part file. This 
leaves users no options but to restart job without savepoint.
2. Having reference to inProgressPart files within savepoints also prevents 
users from restarting job from earlier savepoints, should the user deem it 
appropriate to replay the stream and rewrite to output. The exception should be 
clearer if the intention is to prevent user from starting from earlier 
savepoint to avoid them accidentally replaying stream (therefore violating 
end-to-end exactly once).


May I propose that we change DefaultRollingPolicy to always roll 
inProgressParts to pending on checkpoint?


Thank you
Keith



Reply via email to