[ 
https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528443#comment-17528443
 ] 

Steven Zhen Wu edited comment on FLINK-27405 at 4/26/22 10:42 PM:
------------------------------------------------------------------

cc [~arvid] [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on 
extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to 
promote code reuse. if this is ok with you, [~gang ye] can create a PR later.


was (Author: stevenz3wu):
cc [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting 
a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code 
reuse. if this is ok with you, [~gang ye] can create a PR later.

> Refactor SourceCoordinator to abstract BaseCoordinator implementation
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27405
>                 URL: https://issues.apache.org/jira/browse/FLINK-27405
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: gang ye
>            Priority: Major
>
> To solve small files issue caused by data skewness, Flink Iceberg data 
> shuffling was proposed(design doc 
> [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#).
>  The basic idea is to use statistics operator to collect local statistics for 
> traffic distribution at taskmanagers (workers). Local statistics are 
> periodically sent to the statistics coordinator (running in jobmanager). Once 
> globally aggregated statistics are ready, the statistics coordinator 
> broadcasts them to all operator instances. And then a customized partitioner 
> uses the global statistics which is passed down from statistics operator to 
> distribute data to Iceberg writers.
> In the process of Flink Iceberg data shuffling implementation, we found that, 
> StatisticsCoordinator can share function with 
> SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar 
> function as SourceCoordinatorConext#callInCoordinatorThread and the 
> StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as 
> SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want 
> to refactor the source coordinator classes to abstract a general coordinator 
> implementation to reduce the duplicated code when adding other coordinators. 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to