gang ye created FLINK-27405:
-------------------------------

             Summary: Refactor SourceCoordinator to abstract BaseCoordinator 
implementation
                 Key: FLINK-27405
                 URL: https://issues.apache.org/jira/browse/FLINK-27405
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
            Reporter: gang ye


To solve small files issue caused by data skewness, Flink Iceberg data 
shuffling was proposed(design doc 
[https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#).
 The basic idea is to use statistics operator to collect local statistics for 
traffic distribution at taskmanagers (workers). Local statistics are 
periodically sent to the statistics coordinator (running in jobmanager). Once 
globally aggregated statistics are ready, the statistics coordinator broadcasts 
them to all operator instances. And then a customized partitioner uses the 
global statistics which is passed down from statistics operator to distribute 
data to Iceberg writers.

In the process of Flink Iceberg data shuffling implementation, we found that, 
StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop, 
StatisticsCoordinatorContext needs similar function as 
SourceCoordinatorConext#callInCoordinatorThread and the 
StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as 
SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to 
refactor the source coordinator classes to abstract a general coordinator 
implementation to reduce the duplicated code when adding other coordinators. 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to