Hi,
Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint). My job is configured to checkpoint every minute. Job is running with parallelism 1. The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616. 15:06:37 { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO"} } 15:07:37 { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO" } Thanks, Pawel