[ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785549#comment-17785549
 ] 

Thomas Weise commented on FLINK-33402:
--------------------------------------

[~varun1729dd] thanks for investigating this. It would be helpful to understand 
better why we see the race condition as under the mailbox model this should not 
happen. Let's continue the discussion on the PR.

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-33402
>                 URL: https://issues.apache.org/jira/browse/FLINK-33402
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>            Reporter: Varun Narayanan Chakravarthy
>            Assignee: Varun Narayanan Chakravarthy
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=0000000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
> hosts=[localhost] ID=0000000229 position=null
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
> position=null]
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
> hosts=[localhost] ID=0000000045 position=null
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 0 (#0)
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=0 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
> host '10.4.192.125') is requesting a file source split
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 0
> 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 1
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received custom event from parallel task 1 (#0): 
> SourceReaderFinishedEvent\{sourceIndex=11}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers 
> finished, ready to switch enumerator!
> The assigned split is never processed.
> I traced the race conditions bug to the HybridSourceSplitEnumerator and 
> HybridSourceSplitReader.
> There are race in both the source and the reader side. The attached patch 
> ensures that the switch from one source to another and one reader to another 
> happen in an atomic fashion with respect to the rest of the code. All section 
> of the code that use the currentReader or currentEnumerator are read-locked 
> and the code for reader/enumerator switch is written lock. This ensures that 
> no other function is executed when the switch for reader/enumerator occurs. 
> Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
> majority of the data loss but not all. But, for complete correctness fixes 
> are needed in both locations. Additionally, current readers also needs to be 
> reset before proceeding.
> With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
> with 180 Task Managers, did not see any data loss issue. There was also no 
> noticeable impact on performance due to the read-write mutexes and 
> concurrency control.
> Additonally, integer comparision of objects needs to use `equals` otherwise 
> it won't work above 128. This 
> [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] 
> has been reported before, by another user.
> If the above fixes are valid, please let me know. I would be happy to create 
> a branch and PR against the repo. I have completed and signed the individual 
> CLA and will be emailing it soon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to