Hi,

Flink Streaming Sink is designed to use global counter when creating files
to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
(managed flink provided by AWS) with bulk writes (rolling policy is
hardcoded to roll over on checkpoint).
My job is configured to checkpoint every minute. Job is running with
parallelism 1.

The problem is that the same counter 616 is used for both
files invalid-records/2020-01-22T15_06_00Z/part-0-616
and invalid-records/2020-01-22T15_05_00Z/part-0-616.

15:06:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO"}
}
15:07:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO" }

Thanks,
Pawel

Reply via email to