Jark Wu created FLINK-24607:
-------------------------------

             Summary: SourceCoordinator may miss to close SplitEnumerator when 
failover frequently
                 Key: FLINK-24607
                 URL: https://issues.apache.org/jira/browse/FLINK-24607
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.13.3
            Reporter: Jark Wu
         Attachments: jobmanager.log

We are having a connection leak problem when using mysql-cdc [1] source. We 
observed that many enumerators are not closed from the JM log.

{code}
➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Restoring 
SplitEnumerator" | wc -l
     264
➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Starting split 
enumerator" | wc -l
     264
➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Starting 
enumerator" | wc -l
     263
➜  test123 cat jobmanager.log | grep "SourceCoordinator \[\] - Closing 
SourceCoordinator" | wc -l
     264
➜  test123 cat jobmanager.log | grep "MySqlSourceEnumerator \[\] - Closing 
enumerator" | wc -l
     195
{code}

We added "Closing enumerator" log in {{MySqlSourceEnumerator#close()}}, and 
"Starting enumerator" in {{MySqlSourceEnumerator#start()}}. From the above 
result you can see that SourceCoordinator is restored and closed 264 times, 
split enumerator is started 264 but only closed 195 times. It seems that 
{{SourceCoordinator}} misses to close enumerator when job failover frequently. 

I also went throught the code of {{SourceCoordinator}} and found some 
suspicious point:

The {{started}} flag and  {{enumerator}} is assigned in the main thread, 
however {{SourceCoordinator#close()}} is executed async by 
{{DeferrableCoordinator#closeAsync}}.  That means the close method will check 
the {{started}} and {{enumerator}} variable async. Is there any concurrency 
problem here which mean lead to dirty read and miss to close the 
{{enumerator}}? 

I'm still not sure, because it's hard to reproduce locally, and we can't deploy 
a custom flink version to production env. 


[1]: https://github.com/ververica/flink-cdc-connectors



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to