Hi Pawel,

You are correct that counters are unique within the same bucket but
NOT across buckets. Across buckets, you may see the same counter being
used.
The max counter is used only upon restoring from a failure, resuming
from a savepoint or rescaling and this is done to guarantee that n
valid data are overwritten while limiting the state that Flink has to
keep internally. For a more detailed discussion about the why, you can
have a look here: https://issues.apache.org/jira/browse/FLINK-13609

Cheers,
Kostas

On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
<pawelbartosze...@gmail.com> wrote:
>
> I have looked into the source code and it looks likes that the same counter 
> counter value being used in two buckets is correct.
> Each Bucket class 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>  is passed partCounter in the constructor. Whenever part file is rolled over 
> then counter is incremented within the scope of this bucket. It can happen 
> that there are two or more active buckets and counter is increased 
> independently inside them so that they are become equal. However, global max 
> counter maintained by Buckets class always keeps the max part counter so that 
> when new bucket is created is passed the correct part counter.
>
> I have done my analysis based on the logs from my job. I highlighted the same 
> counter value used for part-0-8.
>
> 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=7.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=8 (max part counter=7).
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=8.
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
> element
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=9 (max part counter=9).
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=9.
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
> element
> 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=10 (max part counter=10).
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
> checkpoint.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
> bucketPath=s3://xxx
>
>
> Thanks,
> Pawel
>
>
> On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pawelbartosze...@gmail.com> 
> wrote:
>>
>> Hi,
>>
>>
>> Flink Streaming Sink is designed to use global counter when creating files 
>> to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics 
>> (managed flink provided by AWS) with bulk writes (rolling policy is 
>> hardcoded to roll over on checkpoint).
>> My job is configured to checkpoint every minute. Job is running with 
>> parallelism 1.
>>
>> The problem is that the same counter 616 is used for both files 
>> invalid-records/2020-01-22T15_06_00Z/part-0-616 and 
>> invalid-records/2020-01-22T15_05_00Z/part-0-616.
>>
>> 15:06:37
>> { "locationInformation": 
>> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
>>  "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
>> "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID 
>> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
>>  "threadName": "Async calls on Source: Custom Source -> Extract Td-agent 
>> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", 
>> "applicationARN": 
>> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", 
>> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": 
>> "INFO"}
>> }
>> 15:07:37
>> { "locationInformation": 
>> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
>>  "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
>> "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID 
>> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
>>  "threadName": "Async calls on Source: Custom Source -> Extract Td-agent 
>> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", 
>> "applicationARN": 
>> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", 
>> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": 
>> "INFO" }
>>
>> Thanks,
>> Pawel

Reply via email to