[ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise reassigned FLINK-33402:
------------------------------------

    Assignee: Varun Narayanan Chakravarthy

> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-33402
>                 URL: https://issues.apache.org/jira/browse/FLINK-33402
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>            Reporter: Varun Narayanan Chakravarthy
>            Assignee: Varun Narayanan Chakravarthy
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=0000000229 position=null] 2023-10-10 
> 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO 
>  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek 
> policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
> Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher 
> for Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished 
> reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source 
> (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader  
> - No more splits for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for 
> Source: parquet-source (1/2)#0|#0] INFO  
> org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] 
> INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting 
> down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
> parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 
> 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
> hosts=[localhost] ID=0000000229 position=null
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
> position=null]
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
> hosts=[localhost] ID=0000000045 position=null
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 0 (#0)
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=0 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
> host '10.4.192.125') is requesting a file source split
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 0
> 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 1
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received custom event from parallel task 1 (#0): 
> SourceReaderFinishedEvent\{sourceIndex=11}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers 
> finished, ready to switch enumerator!
> The assigned split is never processed.
> I traced the race conditions bug to the HybridSourceSplitEnumerator and 
> HybridSourceSplitReader.
> There are race in both the source and the reader side. The attached patch 
> ensures that the switch from one source to another and one reader to another 
> happen in an atomic fashion with respect to the rest of the code. All section 
> of the code that use the currentReader or currentEnumerator are read-locked 
> and the code for reader/enumerator switch is written lock. This ensures that 
> no other function is executed when the switch for reader/enumerator occurs. 
> Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
> majority of the data loss but not all. But, for complete correctness fixes 
> are needed in both locations. Additionally, current readers also needs to be 
> reset before proceeding.
> With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
> with 180 Task Managers, did not see any data loss issue. There was also no 
> noticeable impact on performance due to the read-write mutexes and 
> concurrency control.
> Additonally, integer comparision of objects needs to use `equals` otherwise 
> it won't work above 128. This 
> [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] 
> has been reported before, by another user.
> If the above fixes are valid, please let me know. I would be happy to create 
> a branch and PR against the repo. I have completed and signed the individual 
> CLA and will be emailing it soon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to