Sergey Kononov created FLINK-36419:
--------------------------------------

             Summary: Consider scaling in case of no input records on a source 
operator with long running job 
                 Key: FLINK-36419
                 URL: https://issues.apache.org/jira/browse/FLINK-36419
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
    Affects Versions: kubernetes-operator-1.9.0
            Reporter: Sergey Kononov


There is a case when a Hudi Table Source introduces two connected vertices: 
Split Monitor and Split Reader.  Split Monitor is considered by Flink 
Kubernetes Operator as a factual source because it has no input vertices. Split 
Monitor stops reading and assigning splits long before Split Reader will read 
up all the data from Hudi table and job will be running with low parallelism 
much longer than it could with rescaling. The reason is that now Autoscaler (in 
[ScalingMetricEvaluator|https://github.com/apache/flink-kubernetes-operator/blob/release-1.9/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java])
 evaluates target data rates starting from the sources and if a source stops 
reading the data all target data rates downstream become zero and corresponding 
vertices are kept at lowest parallelism configured.

Refactoring Hudi source according to FLIP-27 and FLIP-33 could have helped with 
the issue. Meanwhile a rather simple and general fix can be made to Autoscaler 
in order to support this case.

The solution is to evaluate target data rate starting from the last downstream 
vertex or vertices in the job graph that still feeds the data. It would also 
resolve the general case when some vertices continue to process data when 
sources already stopped. I tested this solution on a k8s cluster and willing to 
contribute it if assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to