
After disabling the adaptive scheduler, I was able to have the operator stop
the job with a savepoint, and resume the job from that savepoint after the
upgrade. However I observed that the upgrade life cycle is quite slow as it
takes down and then brings back up all the task managers. I am wondering if
there are ways to have the operator stop the job with savepoint and resume
from it without taking down the task managers. If I switch from application
mode to session mode (only one job on that session cluster) and use the
adaptive scheduler, does it solve the problem?


On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi <chetas.jo...@gmail.com> wrote:

> Got it. Thanks!
> On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>> > Does this mean it won't trigger a checkpoint before scaling up or
>> scaling down?
>> The in-place rescaling won't do that.
>> > Do I need the autotuning to be turned on for exactly once processing?
>> It suffices to just go back to the full-restart upgrade mode provided by
>> the operator: disable adaptive scheduler, and set the upgradeMode to
>> savepoint [1]. Operator will stop the job with a final savepoint, and
>> resume the job from that savepoint after the upgrade.
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* Chetas Joshi <chetas.jo...@gmail.com>
>> *Sent:* Thursday, June 13, 2024 6:33
>> *To:* Zhanghao Chen <zhanghao.c...@outlook.com>
>> *Cc:* Sachin Sharma <sachinapr...@gmail.com>; Gyula Fóra <
>> gyula.f...@gmail.com>; Oscar Perez via user <user@flink.apache.org>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>> Hi Zhanghao,
>> If I am using the autoscaler (flink-k8s-operator) without enabling
>> auto-tuning, the documentation says it triggers in-place scaling without
>> going through a full job upgrade lifecycle. Does this mean it won't trigger
>> a checkpoint before scaling up or scaling down?
>> I am observing that entries are being read from Kafka (the source in my
>> flink app) multiple times if the flink app goes through scaling. Do I need
>> the autotuning to be turned on for exactly once processing?
>> Thanks
>> Chetas
>> On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen <zhanghao.c...@outlook.com>
>> wrote:
>> Hi,
>> Reactive mode and the Autoscaler in Kubernetes operator are two different
>> approaches towards elastic scaling of Flink.
>> Reactive mode [1] has to be used together with the passive resource
>> management approach of Flink (only Standalone mode takes this approach),
>> where the TM number is controlled by external systems and the job
>> parallelism is derived from the available resources in a reactive manner.
>> The workflow is as follows:
>> External service (like K8s HPA controller) monitors job load and adjust
>> TM number to match the load -> Flink adjust job parallelism *reactively*.
>> The Autoscaler [2] is to be used with the active resource management
>> approach of Flink (YARN/K8s Application/Session mode), where The TM number
>> is derived from the job resource need, and Flink actively requests/releases
>> TMs to match job need. The workflow for the Autoscaler is:
>> Autoscaler monitors job load and adjusts job parallelism *actively* to
>> match the load -> Flink adjusts TM number to match the job need.
>> I would recommend using the Autoscaler instead of the Reactive mode for:
>>    1. The Autoscaler provides a all-in-one solution while Reactive mode
>>    needs to be paired with an external TM number adjusting system.
>>    2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>>    streaming workloads. It will fine-tune each operator's parallelism and 
>> take
>>    task load balancing in mind while reactive mode just adjust all operator's
>>    parallelism uniformly.
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
>> [2]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* Sachin Sharma <sachinapr...@gmail.com>
>> *Sent:* Saturday, June 8, 2024 3:02
>> *To:* Gyula Fóra <gyula.f...@gmail.com>
>> *Cc:* Chetas Joshi <chetas.jo...@gmail.com>; Oscar Perez via user <
>> user@flink.apache.org>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>> Hi,
>> I have a question related to this.
>> I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with
>> Reactive mode enabled, I added some dummy slow and fast operator to the
>> flink job and i can see there is a back pressure accumulated. but i am not
>> sure why my Flink task managers are not scaled by the operator. Also, can
>> someone explain if autoscalers job is just to add more task manager and
>> then Reactive mode will adjust the parallelism based on the configurations?
>> As per the Operator documentation - Users configure the target
>> utilization percentage of the operators in the pipeline, e.g. keep the all
>> operators between 60% - 80% busy. The autoscaler then finds a parallelism
>> configuration such that the output rates of all operators match the input
>> rates of all their downstream operators at the targeted utilization. So
>> does the autoscaler(Part of kubernetes operator) controls the parallelism
>> or the Reactive mode in the Flink job controls it.
>> Thanks & Regards,
>> Sachin Sharma
>> On Fri, Jun 7, 2024 at 4:55 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>> Hi!
>> To simplify things you can generally look at TRUE_PROCESSING_RATE,
>> If TPR is below the scale up threshold then we should scale up and if its
>> above the scale down threshold then we scale down.
>> In your case what we see for your source
>> (cbc357ccb763df2852fee8c4fc7d55f2) as logged is that:
>> TPR: 17498
>> So it should definitely be scaled up in theory, however you can also see:
>> `Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based
>> on available partitions to 1`
>> This means that the source max parallelism was determined by the
>> available kafka partitions to be 1. It would not make sense to increase the
>> parallelism even though we are clearly falling behind as the source cannot
>> consume a single partition in parallel.
>> Hope this helps
>> Gyula
>> On Fri, Jun 7, 2024 at 3:41 AM Chetas Joshi <chetas.jo...@gmail.com>
>> wrote:
>> Hi Community,
>> I want to understand the following logs from the flink-k8s-operator
>> autoscaler. My flink pipeline running on 1.18.0 and using
>> flink-k8s-operator (1.8.0) is not scaling up even though the source vertex
>> is back-pressured.
>> 2024-06-06 21:33:35,270 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on
>> available partitions to 1
>> 2024-06-06 21:33:35,276 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC,
>> Source__dd-log-source.numRecordsOut=SOURCE_TASK_NUM_RECORDS_OUT,
>> Source__dd-log-source.numRecordsIn=SOURCE_TASK_NUM_RECORDS_IN,
>> Source__dd-log-source.pendingRecords=PENDING_RECORDS,
>> Source__dd-log-source.numRecordsInPerSecond=SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
>> backPressuredTimeMsPerSecond=BACKPRESSURE_TIME_PER_SEC} for
>> cbc357ccb763df2852fee8c4fc7d55f2
>> 2024-06-06 21:33:35,282 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> 61214243927da46230dfd349fba7b8e6
>> 2024-06-06 21:33:35,286 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> 7758b2b5ada48872db09a5c48176e34e
>> 2024-06-06 21:33:35,291 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> eab9c0013081b8479e60463931f3a593
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2
>> from
>> {BACKPRESSURE_TIME_PER_SEC=AggregatedMetric{id='backPressuredTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'},
>> BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='192.0',
>> max='192.0', avg='192.0', sum='192.0'},
>> SOURCE_TASK_NUM_RECORDS_OUT=AggregatedMetric{id='Source__dd-log-source.numRecordsOut',
>> mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
>> PENDING_RECORDS=AggregatedMetric{id='Source__dd-log-source.pendingRecords',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'},
>> SOURCE_TASK_NUM_RECORDS_IN=AggregatedMetric{id='Source__dd-log-source.numRecordsIn',
>> mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
>> SOURCE_TASK_NUM_RECORDS_IN_PER_SEC=AggregatedMetric{id='Source__dd-log-source.numRecordsInPerSecond',
>> mim='1682.7333333333333', max='1682.7333333333333',
>> avg='1682.7333333333333', sum='1682.7333333333333'}}
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2:
>> NUM_RECORDS_IN=613279.0, OBSERVED_TPR=Infinity, LAG=0.0}
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for eab9c0013081b8479e60463931f3a593
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for eab9c0013081b8479e60463931f3a593:
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for 61214243927da46230dfd349fba7b8e6
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for 61214243927da46230dfd349fba7b8e6:
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>> 2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e:
>> NUM_RECORDS_IN=117.0}
>> 2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Global metrics: {NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
>> 2024-06-06 21:33:35,306 o.a.f.a.ScalingTracking        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Cannot record restart duration because already set in the latest record:
>> PT0.114185S
>> 2024-06-06 21:33:35,307 o.a.f.a.JobAutoScalerImpl      
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Collected metrics:
>> CollectedMetricHistory(jobTopology=JobTopology(vertexInfos={cbc357ccb763df2852fee8c4fc7d55f2=VertexInfo(id=cbc357ccb763df2852fee8c4fc7d55f2,
>> inputs={}, outputs={61214243927da46230dfd349fba7b8e6=REBALANCE,
>> 7758b2b5ada48872db09a5c48176e34e=HASH}, parallelism=1, maxParallelism=1,
>> originalMaxParallelism=20, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=0, numRecordsOut=125,
>> accumulatedBusyTime=32301.0)),
>> eab9c0013081b8479e60463931f3a593=VertexInfo(id=eab9c0013081b8479e60463931f3a593,
>> inputs={7758b2b5ada48872db09a5c48176e34e=REBALANCE}, outputs={},
>> parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
>> accumulatedBusyTime=0.0)),
>> 61214243927da46230dfd349fba7b8e6=VertexInfo(id=61214243927da46230dfd349fba7b8e6,
>> inputs={cbc357ccb763df2852fee8c4fc7d55f2=REBALANCE}, outputs={},
>> parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
>> accumulatedBusyTime=0.0)),
>> 7758b2b5ada48872db09a5c48176e34e=VertexInfo(id=7758b2b5ada48872db09a5c48176e34e,
>> inputs={cbc357ccb763df2852fee8c4fc7d55f2=HASH},
>> outputs={eab9c0013081b8479e60463931f3a593=REBALANCE}, parallelism=1,
>> maxParallelism=20, originalMaxParallelism=20, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=117, numRecordsOut=8,
>> accumulatedBusyTime=0.0))}, finishedVertices=[],
>> verticesInTopologicalOrder=[cbc357ccb763df2852fee8c4fc7d55f2,
>> 61214243927da46230dfd349fba7b8e6, 7758b2b5ada48872db09a5c48176e34e,
>> eab9c0013081b8479e60463931f3a593]),
>> metricHistory={2024-06-06T21:32:35.170678Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=20821.0,
>> NUM_RECORDS_OUT=109.0, LOAD=0.0, NUM_RECORDS_IN=512339.0,
>> OBSERVED_TPR=Infinity, LAG=0.0},
>> eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
>> 61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
>> 7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
>> globalMetrics={NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.5849425971687019, HEAP_MEMORY_USED=4.08495608E8,
>> 2024-06-06T21:33:35.258489Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=32301.0,
>> NUM_RECORDS_OUT=125.0, LOAD=0.192, NUM_RECORDS_IN=613279.0,
>> OBSERVED_TPR=Infinity, LAG=0.0},
>> eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
>> 61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
>> 7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
>> globalMetrics={NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
>> jobRunningTs=2024-06-06T21:17:35.712Z, fullyCollected=true)
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Restart time used in metrics evaluation: PT5M
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Using busy time based tpr 17498.932104004747 for
>> cbc357ccb763df2852fee8c4fc7d55f2.
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
>> 61214243927da46230dfd349fba7b8e6) data rate for single input downstream task
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
>> 61214243927da46230dfd349fba7b8e6) : 9.906875371507826E-6
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
>> 7758b2b5ada48872db09a5c48176e34e) data rate for single input downstream task
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
>> 7758b2b5ada48872db09a5c48176e34e) : 1.486031305726174E-4
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (7758b2b5ada48872db09a5c48176e34e,
>> eab9c0013081b8479e60463931f3a593) data rate for single input downstream task
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (7758b2b5ada48872db09a5c48176e34e ->
>> eab9c0013081b8479e60463931f3a593) : 0.06666666666666667
>> 2024-06-06 21:33:35,308 o.a.f.a.JobAutoScalerImpl      
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Evaluated metrics:
>> EvaluatedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=1679.897), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN),
>> SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=83995.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=17498.932), LOAD=EvaluatedScalingMetric(current=NaN,
>> average=0.096),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN), LAG=EvaluatedScalingMetric(current=0.0, average=NaN)},
>> eab9c0013081b8479e60463931f3a593={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)},
>> 61214243927da46230dfd349fba7b8e6={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)},
>> 7758b2b5ada48872db09a5c48176e34e={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.25), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=13.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=20.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)}},
>> globalMetrics={HEAP_MAX_USAGE_RATIO=EvaluatedScalingMetric(current=0.68,
>> average=0.632), NUM_TASK_SLOTS_USED=EvaluatedScalingMetric(current=1.0,
>> average=NaN), GC_PRESSURE=EvaluatedScalingMetric(current=NaN, average=NaN),
>> HEAP_MEMORY_USED=EvaluatedScalingMetric(current=4.74886648E8,
>> average=4.41691128E8),
>> METASPACE_MEMORY_USED=EvaluatedScalingMetric(current=1.40677456E8,
>> average=1.41885624E8),
>> MANAGED_MEMORY_USED=EvaluatedScalingMetric(current=0.0, average=0.0)})
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingExecutor        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Restart time used in scaling summary computation: PT5M
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is 168270.0
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is
>> 168270.0
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for eab9c0013081b8479e60463931f3a593 is 2.0
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for eab9c0013081b8479e60463931f3a593 is capped
>> by maximum scale down factor to 0.4
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for eab9c0013081b8479e60463931f3a593 is
>> Infinity
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for 61214243927da46230dfd349fba7b8e6 is 2.0
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for 61214243927da46230dfd349fba7b8e6 is capped
>> by maximum scale down factor to 0.4
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for 61214243927da46230dfd349fba7b8e6 is
>> Infinity
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for 7758b2b5ada48872db09a5c48176e34e is 25.0
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for 7758b2b5ada48872db09a5c48176e34e is capped
>> by maximum scale down factor to 0.4
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for 7758b2b5ada48872db09a5c48176e34e is
>> Infinity
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 20. This means the operator max parallelism can never be
>> reached.
>> 2024-06-06 21:33:35,309 o.a.f.a.ScalingExecutor        [INFO ][flink/
>> pipeline-pipelinelocal] All job vertices are currently running at their
>> target parallelism.
>> Some Questions
>> 1. Does the autoscaler decide to scale a job vertex when the target
>> processing capacity is higher than the current processing capacity? If yes,
>> how to check the current processing capacity?
>> 2. Which metrics in the above logs say the target utilization threshold
>> is not reached and hence all the vertices are running at target parallelism?
>> Thank you
>> Chetas

Reply via email to