[ https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528443#comment-17528443 ]
Steven Zhen Wu edited comment on FLINK-27405 at 4/26/22 10:42 PM: ------------------------------------------------------------------ cc [~arvid] [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later. was (Author: stevenz3wu): cc [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later. > Refactor SourceCoordinator to abstract BaseCoordinator implementation > --------------------------------------------------------------------- > > Key: FLINK-27405 > URL: https://issues.apache.org/jira/browse/FLINK-27405 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Reporter: gang ye > Priority: Major > > To solve small files issue caused by data skewness, Flink Iceberg data > shuffling was proposed(design doc > [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). > The basic idea is to use statistics operator to collect local statistics for > traffic distribution at taskmanagers (workers). Local statistics are > periodically sent to the statistics coordinator (running in jobmanager). Once > globally aggregated statistics are ready, the statistics coordinator > broadcasts them to all operator instances. And then a customized partitioner > uses the global statistics which is passed down from statistics operator to > distribute data to Iceberg writers. > In the process of Flink Iceberg data shuffling implementation, we found that, > StatisticsCoordinator can share function with > SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar > function as SourceCoordinatorConext#callInCoordinatorThread and the > StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as > SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want > to refactor the source coordinator classes to abstract a general coordinator > implementation to reduce the duplicated code when adding other coordinators. > -- This message was sent by Atlassian Jira (v8.20.7#820007)