gang ye created FLINK-27405:
-------------------------------
Summary: Refactor SourceCoordinator to abstract BaseCoordinator
implementation
Key: FLINK-27405
URL: https://issues.apache.org/jira/browse/FLINK-27405
Project: Flink
Issue Type: Improvement
Components: Runtime / Coordination
Reporter: gang ye
To solve small files issue caused by data skewness, Flink Iceberg data
shuffling was proposed(design doc
[https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#).
The basic idea is to use statistics operator to collect local statistics for
traffic distribution at taskmanagers (workers). Local statistics are
periodically sent to the statistics coordinator (running in jobmanager). Once
globally aggregated statistics are ready, the statistics coordinator broadcasts
them to all operator instances. And then a customized partitioner uses the
global statistics which is passed down from statistics operator to distribute
data to Iceberg writers.
In the process of Flink Iceberg data shuffling implementation, we found that,
StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop,
StatisticsCoordinatorContext needs similar function as
SourceCoordinatorConext#callInCoordinatorThread and the
StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as
SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to
refactor the source coordinator classes to abstract a general coordinator
implementation to reduce the duplicated code when adding other coordinators.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)