[ 
https://issues.apache.org/jira/browse/FLINK-13609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16916521#comment-16916521
 ] 

Kostas Kloudas commented on FLINK-13609:
----------------------------------------

I agree that the fact that the part-counter for a new bucket is not 0 is 
counter-intuitive, but this is the expected behaviour.

The reason for this can be illustrated with the following example (I assume a 
single bucket in the example for simplicity but you can extrapolate the problem 
to multiple buckets):
1) our job has parallelism 3, all tasks write to the same bucket (for 
simplicity), and all tasks have seen elements so we have:

      -> {{part-0-0, part-0-1, part-0-2}}     
      -> {{part-1-0, part-1-1}}

      -> {{part-2-0, part-2-1, part-2-3, part-2-3}}


2) we scale down to parallelism 1. Now there is only one parallel instance 
writing so we have the previous state of part file, plus:

      -> {{part-0-3, part-0-4, part-0-5, ...}}   

3) now we scale up again to 3 or more. In this case, what should be the 
starting counter of subtask 2 or 1? There is nothing in state from the previous 
subtask-1 and 2, as all files for the previous subtasks 1 and 2 have been 
cleared up, and the only thing we have is the max counter seen by all buckets 
so far.

In addition, although this is what is currently happening, there is no 
guarantee that even after resuming without rescaling, that subtask 0 will get 
the state of the previous subtask 0. This complicates things even without 
rescaling.

If it were to keep everything in state forever (e.g. max counter per bucket or 
per task per bucket), then this could blow up the state. 

In the rolling sink (the predecessor of the {{StreamingFileSink}}) we assumed a 
consistent file system, so we were starting the counter from 0 and we were 
asking of {{part-X-0}} exists, if yes then we would increase the counter by one 
and try again, if not then we would use this. For the {{StreamingFileSink}} we 
make no such assumptions, as we want it to work properly also with filesystems 
like S3.

> StreamingFileSink - reset part counter on bucket change
> -------------------------------------------------------
>
>                 Key: FLINK-13609
>                 URL: https://issues.apache.org/jira/browse/FLINK-13609
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Joao Boto
>            Priority: Major
>
> When writing to files using StreamingFileSink on bucket change we expect that 
> partcounter will reset its counter to 0
> as a example
>  * using DateTimeBucketAssigner using ({color:#6a8759}yyyy/MM/dd/HH{color}) 
>  * and ten files hour (for simplicity)
> this will create the:
>  * bucket 2019/08/07/00 with files partfile-0-0 to partfile-0-9
>  * bucket 2019/08/07/01 with files partfile-0-10 to partfile-0-19
>  * bucket 2019/08/07/02 with files partfile-0-20 to partfile-0-29
> and we expect this:
>  * bucket 2019/08/07/00 with files partfile-0-0 to partfile-0-9
>  * bucket 2019/08/07/01 with files partfile-0-0 to partfile-0-9
>  * bucket 2019/08/07/02 with files partfile-0-0 to partfile-0-9
>  
> [~kkl0u] i don't know if it's the expected behavior  (or this can be 
> configured)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to