Sergey Kononov created FLINK-36419:
--------------------------------------
Summary: Consider scaling in case of no input records on a source
operator with long running job
Key: FLINK-36419
URL: https://issues.apache.org/jira/browse/FLINK-36419
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.9.0
Reporter: Sergey Kononov
There is a case when a Hudi Table Source introduces two connected vertices:
Split Monitor and Split Reader. Split Monitor is considered by Flink
Kubernetes Operator as a factual source because it has no input vertices. Split
Monitor stops reading and assigning splits long before Split Reader will read
up all the data from Hudi table and job will be running with low parallelism
much longer than it could with rescaling. The reason is that now Autoscaler (in
[ScalingMetricEvaluator|https://github.com/apache/flink-kubernetes-operator/blob/release-1.9/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java])
evaluates target data rates starting from the sources and if a source stops
reading the data all target data rates downstream become zero and corresponding
vertices are kept at lowest parallelism configured.
Refactoring Hudi source according to FLIP-27 and FLIP-33 could have helped with
the issue. Meanwhile a rather simple and general fix can be made to Autoscaler
in order to support this case.
The solution is to evaluate target data rate starting from the last downstream
vertex or vertices in the job graph that still feeds the data. It would also
resolve the general case when some vertices continue to process data when
sources already stopped. I tested this solution on a k8s cluster and willing to
contribute it if assigned.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)