I have looked into the source code and it looks likes that the same counter
counter value being used in two buckets is correct.
Each Bucket class
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
is
passed partCounter in the constructor. Whenever part file is rolled over
then counter is incremented within the scope of this bucket. It can happen
that there are two or more active buckets and counter is increased
independently inside them so that they are become equal. However,
global max counter maintained by Bucket*s *class always keeps the max part
counter so that when new bucket is created is passed the correct part
counter.

I have done my analysis based on the logs from my job. I highlighted the
same counter value used for part-0-8.

2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-6" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=7.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-7" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=8.
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "*part-0-8*" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-8" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=9.
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-9" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=10 (max part
counter=10).
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z on checkpoint.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z
and bucketPath=s3://xxx


Thanks,
Pawel


On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pawelbartosze...@gmail.com>
wrote:

> Hi,
>
>
> Flink Streaming Sink is designed to use global counter when creating files
> to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
> (managed flink provided by AWS) with bulk writes (rolling policy is
> hardcoded to roll over on checkpoint).
> My job is configured to checkpoint every minute. Job is running with
> parallelism 1.
>
> The problem is that the same counter 616 is used for both
> files invalid-records/2020-01-22T15_06_00Z/part-0-616
> and invalid-records/2020-01-22T15_05_00Z/part-0-616.
>
> 15:06:37
> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
> "Committing
> invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO"}
> }
> 15:07:37
> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
> "Committing
> invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO" }
>
> Thanks,
> Pawel
>

Reply via email to