I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct. Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Bucket*s *class always keeps the max part counter so that when new bucket is created is passed the correct part counter.
I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8. 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z. 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=7. 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7). 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z. 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=8. 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "*part-0-8*" for bucket id=2020-01-24T14_55_00Z. 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9). 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z. 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=9. 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z. 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10). 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint. 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint. 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx Thanks, Pawel On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pawelbartosze...@gmail.com> wrote: > Hi, > > > Flink Streaming Sink is designed to use global counter when creating files > to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics > (managed flink provided by AWS) with bulk writes (rolling policy is > hardcoded to roll over on checkpoint). > My job is configured to checkpoint every minute. Job is running with > parallelism 1. > > The problem is that the same counter 616 is used for both > files invalid-records/2020-01-22T15_06_00Z/part-0-616 > and invalid-records/2020-01-22T15_05_00Z/part-0-616. > > 15:06:37 > { "locationInformation": > "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", > "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": > "Committing > invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID > f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", > "threadName": "Async calls on Source: Custom Source -> Extract Td-agent > message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", > "applicationARN": > "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", > "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": > "INFO"} > } > 15:07:37 > { "locationInformation": > "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", > "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": > "Committing > invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID > XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", > "threadName": "Async calls on Source: Custom Source -> Extract Td-agent > message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", > "applicationARN": > "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", > "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": > "INFO" } > > Thanks, > Pawel >