Hi Pawel,

You are correct that the write method invocation is guaranteed to be
thread safe for the same sub operator instance.
But I am not sure if having a unique counter per subtask across
buckets would add much to the user experience of the sink.
I think that in both cases, the interpretation of the part files would
be the same.

I may be wrong though so please let me know if this is a deal breaker for you.

Cheers,
Kostas


On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek
<pawelbartosze...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thanks for confirming that. I started thinking it might be useful or more 
> user friendly to use unique counter across buckets for the same operator 
> subtask?
> The way I could imagine this working is to pass max counter to the 
> https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204
>  write method? or bucket holding instance of Buckets class and accessing 
> global counter from there? As far as I know the write method invocation is 
> guaranteed to be thread safe for the same sub operator instance.
>
> Thanks,
> Pawel
>
>
> On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kklou...@gmail.com> wrote:
>>
>> Hi Pawel,
>>
>> You are correct that counters are unique within the same bucket but
>> NOT across buckets. Across buckets, you may see the same counter being
>> used.
>> The max counter is used only upon restoring from a failure, resuming
>> from a savepoint or rescaling and this is done to guarantee that n
>> valid data are overwritten while limiting the state that Flink has to
>> keep internally. For a more detailed discussion about the why, you can
>> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
>> <pawelbartosze...@gmail.com> wrote:
>> >
>> > I have looked into the source code and it looks likes that the same 
>> > counter counter value being used in two buckets is correct.
>> > Each Bucket class 
>> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>> >  is passed partCounter in the constructor. Whenever part file is rolled 
>> > over then counter is incremented within the scope of this bucket. It can 
>> > happen that there are two or more active buckets and counter is increased 
>> > independently inside them so that they are become equal. However, global 
>> > max counter maintained by Buckets class always keeps the max part counter 
>> > so that when new bucket is created is passed the correct part counter.
>> >
>> > I have done my analysis based on the logs from my job. I highlighted the 
>> > same counter value used for part-0-8.
>> >
>> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=7.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
>> > element
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=8.
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
>> > element
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
>> > element
>> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=9.
>> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
>> > element
>> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10).
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
>> > bucketPath=s3://xxx
>> >
>> >
>> > Thanks,
>> > Pawel
>> >
>> >
>> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pawelbartosze...@gmail.com> 
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >>
>> >> Flink Streaming Sink is designed to use global counter when creating 
>> >> files to avoid overwrites. I am running Flink 1.8.2 with Kinesis 
>> >> Analytics (managed flink provided by AWS) with bulk writes (rolling 
>> >> policy is hardcoded to roll over on checkpoint).
>> >> My job is configured to checkpoint every minute. Job is running with 
>> >> parallelism 1.
>> >>
>> >> The problem is that the same counter 616 is used for both files 
>> >> invalid-records/2020-01-22T15_06_00Z/part-0-616 and 
>> >> invalid-records/2020-01-22T15_05_00Z/part-0-616.
>> >>
>> >> 15:06:37
>> >> { "locationInformation": 
>> >> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
>> >>  "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
>> >> "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID 
>> >> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
>> >>  "threadName": "Async calls on Source: Custom Source -> Extract Td-agent 
>> >> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", 
>> >> "applicationARN": 
>> >> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", 
>> >> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": 
>> >> "INFO"}
>> >> }
>> >> 15:07:37
>> >> { "locationInformation": 
>> >> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
>> >>  "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": 
>> >> "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID 
>> >> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
>> >>  "threadName": "Async calls on Source: Custom Source -> Extract Td-agent 
>> >> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", 
>> >> "applicationARN": 
>> >> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", 
>> >> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": 
>> >> "INFO" }
>> >>
>> >> Thanks,
>> >> Pawel

Reply via email to