Hi Max, Thank you for the proposal. The proposal tackles a very important issue for Flink users and the design looks promising overall!
I have some questions to better understand the proposed public interfaces and the algorithm. 1) The proposal seems to assume that the operator's busyTimeMsPerSecond could reach 1 sec. I believe this is mostly true for cpu-bound operators. Could you confirm that this can also be true for io-bound operators such as sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck when flushing data out to the Kafka clusters, will busyTimeMsPerSecond reach 1 sec? 2) It is said that "users can configure a maximum time to fully process the backlog". The configuration section does not seem to provide this config. Could you specify this? And any chance this proposal can provide the formula for calculating the new processing rate? 3) How are users expected to specify the per-operator configs (e.g. target utilization)? For example, should users specify it programmatically in a DataStream/Table/SQL API? 4) How often will the Flink Kubernetes operator query metrics from JobManager? Is this configurable? 5) Could you specify the config name and default value for the proposed configs? 6) Could you add the name/mbean/type for the proposed metrics? Cheers, Dong