Sergey Kononov created FLINK-36419: -------------------------------------- Summary: Consider scaling in case of no input records on a source operator with long running job Key: FLINK-36419 URL: https://issues.apache.org/jira/browse/FLINK-36419 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.9.0 Reporter: Sergey Kononov
There is a case when a Hudi Table Source introduces two connected vertices: Split Monitor and Split Reader. Split Monitor is considered by Flink Kubernetes Operator as a factual source because it has no input vertices. Split Monitor stops reading and assigning splits long before Split Reader will read up all the data from Hudi table and job will be running with low parallelism much longer than it could with rescaling. The reason is that now Autoscaler (in [ScalingMetricEvaluator|https://github.com/apache/flink-kubernetes-operator/blob/release-1.9/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java]) evaluates target data rates starting from the sources and if a source stops reading the data all target data rates downstream become zero and corresponding vertices are kept at lowest parallelism configured. Refactoring Hudi source according to FLIP-27 and FLIP-33 could have helped with the issue. Meanwhile a rather simple and general fix can be made to Autoscaler in order to support this case. The solution is to evaluate target data rate starting from the last downstream vertex or vertices in the job graph that still feeds the data. It would also resolve the general case when some vertices continue to process data when sources already stopped. I tested this solution on a k8s cluster and willing to contribute it if assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)