Hi dev,

I'd like to open discussion on deprecating Global Aggregate Manager in favor of 
Operator Coordinator.


  1.  Global Aggregate Manager is rarely used and can be replaced by Opeator 
Coordinator. Global Aggregate Manager was introduced in 
[1]<https://issues.apache.org/jira/browse/FLINK-10886> to support event time 
synchronization across sources and more generally, coordination of parallel 
tasks. AFAIK, this was only used in the Kinesis source [2] for an early version 
of watermark alignment. Operator Coordinator, introduced in [3], provides a 
more powerful and elegant solution for that need and is part of the new source 
API standard.
  2.  Global Aggregate Manager manages state in JobMaster object, causing 
problems for adaptive parallelism changes. It maintains a state (the 
accumulators field in JobMaster) in JM memory. The accumulator state content is 
defined in user code. In my company, a user stores task parallelism in the 
accumulator, assuming task parallelism never changes. However, this assumption 
is broken when using adaptive scheduler. See [4] for more details.

Therefore, I think we should deprecate the use of Global Aggregate Manager, 
which can improve the maintainability of the Flink codebase without 
compromising its functionality. Looking forward to your opinions on this.

[1] https://issues.apache.org/jira/browse/FLINK-10886
[2] 
https://github.com/apache/flink-connector-aws/blob/d0817fecdcaa53c4bf039761c2d1a16e8fb9f89b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-SplitEnumerator
[4] [FLINK-31245] Adaptive scheduler does not reset the state of 
GlobalAggregateManager when rescaling - ASF JIRA 
(apache.org)<https://issues.apache.org/jira/browse/FLINK-31245?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel>

Best,
Zhanghao Chen

Reply via email to