You can try session mode with only one job, but still with adaptive scheduler 
disabled. When stopping a session job, the TMs won't be released immediately 
and can be reused later.

Best,
Zhanghao Chen
________________________________
From: Chetas Joshi <chetas.jo...@gmail.com>
Sent: Tuesday, June 25, 2024 1:59
To: Zhanghao Chen <zhanghao.c...@outlook.com>
Cc: Gyula Fóra <gyula.f...@gmail.com>; Oscar Perez via user 
<user@flink.apache.org>
Subject: Re: Understanding flink-autoscaler behavior

Hello,

After disabling the adaptive scheduler, I was able to have the operator stop 
the job with a savepoint, and resume the job from that savepoint after the 
upgrade. However I observed that the upgrade life cycle is quite slow as it 
takes down and then brings back up all the task managers. I am wondering if 
there are ways to have the operator stop the job with savepoint and resume from 
it without taking down the task managers. If I switch from application mode to 
session mode (only one job on that session cluster) and use the adaptive 
scheduler, does it solve the problem?

Thanks
Chetas

On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi 
<chetas.jo...@gmail.com<mailto:chetas.jo...@gmail.com>> wrote:
Got it. Thanks!

On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
> Does this mean it won't trigger a checkpoint before scaling up or scaling 
> down?

The in-place rescaling won't do that.

> Do I need the autotuning to be turned on for exactly once processing?

It suffices to just go back to the full-restart upgrade mode provided by the 
operator: disable adaptive scheduler, and set the upgradeMode to savepoint [1]. 
Operator will stop the job with a final savepoint, and resume the job from that 
savepoint after the upgrade.

[1] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades


Best,
Zhanghao Chen
________________________________
From: Chetas Joshi <chetas.jo...@gmail.com<mailto:chetas.jo...@gmail.com>>
Sent: Thursday, June 13, 2024 6:33
To: Zhanghao Chen <zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>>
Cc: Sachin Sharma <sachinapr...@gmail.com<mailto:sachinapr...@gmail.com>>; 
Gyula Fóra <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>>; Oscar Perez via 
user <user@flink.apache.org<mailto:user@flink.apache.org>>

Subject: Re: Understanding flink-autoscaler behavior

Hi Zhanghao,

If I am using the autoscaler (flink-k8s-operator) without enabling auto-tuning, 
the documentation says it triggers in-place scaling without going through a 
full job upgrade lifecycle. Does this mean it won't trigger a checkpoint before 
scaling up or scaling down?

I am observing that entries are being read from Kafka (the source in my flink 
app) multiple times if the flink app goes through scaling. Do I need the 
autotuning to be turned on for exactly once processing?

Thanks
Chetas

On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
<zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote:
Hi,

Reactive mode and the Autoscaler in Kubernetes operator are two different 
approaches towards elastic scaling of Flink.

Reactive mode [1] has to be used together with the passive resource management 
approach of Flink (only Standalone mode takes this approach), where the TM 
number is controlled by external systems and the job parallelism is derived 
from the available resources in a reactive manner. The workflow is as follows:

External service (like K8s HPA controller) monitors job load and adjust TM 
number to match the load -> Flink adjust job parallelism reactively.

The Autoscaler [2] is to be used with the active resource management approach 
of Flink (YARN/K8s Application/Session mode), where The TM number is derived 
from the job resource need, and Flink actively requests/releases TMs to match 
job need. The workflow for the Autoscaler is:

Autoscaler monitors job load and adjusts job parallelism actively to match the 
load -> Flink adjusts TM number to match the job need.

I would recommend using the Autoscaler instead of the Reactive mode for:

  1.
The Autoscaler provides a all-in-one solution while Reactive mode needs to be 
paired with an external TM number adjusting system.
  2.
The Autoscaler has a fine-tuned scaling algorithm tailored for streaming 
workloads. It will fine-tune each operator's parallelism and take task load 
balancing in mind while reactive mode just adjust all operator's parallelism 
uniformly.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
[2] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/


Best,
Zhanghao Chen
________________________________
From: Sachin Sharma <sachinapr...@gmail.com<mailto:sachinapr...@gmail.com>>
Sent: Saturday, June 8, 2024 3:02
To: Gyula Fóra <gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>>
Cc: Chetas Joshi <chetas.jo...@gmail.com<mailto:chetas.jo...@gmail.com>>; Oscar 
Perez via user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Understanding flink-autoscaler behavior

Hi,

I have a question related to this.

I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with 
Reactive mode enabled, I added some dummy slow and fast operator to the flink 
job and i can see there is a back pressure accumulated. but i am not sure why 
my Flink task managers are not scaled by the operator. Also, can someone 
explain if autoscalers job is just to add more task manager and then Reactive 
mode will adjust the parallelism based on the configurations? As per the 
Operator documentation - Users configure the target utilization percentage of 
the operators in the pipeline, e.g. keep the all operators between 60% - 80% 
busy. The autoscaler then finds a parallelism configuration such that the 
output rates of all operators match the input rates of all their downstream 
operators at the targeted utilization. So does the autoscaler(Part of 
kubernetes operator) controls the parallelism or the Reactive mode in the Flink 
job controls it.

Thanks & Regards,
Sachin Sharma




On Fri, Jun 7, 2024 at 4:55 AM Gyula Fóra 
<gyula.f...@gmail.com<mailto:gyula.f...@gmail.com>> wrote:
Hi!

To simplify things you can generally look at TRUE_PROCESSING_RATE, 
SCALUE_UP_RATE_THRESHOLD and SCALE_DOWN_RATE_THRESHOLD.
If TPR is below the scale up threshold then we should scale up and if its above 
the scale down threshold then we scale down.

In your case what we see for your source (cbc357ccb763df2852fee8c4fc7d55f2) as 
logged is that:

TPR: 17498
SCALE_UP_THRESHOLD: 83995

So it should definitely be scaled up in theory, however you can also see:
`Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on 
available partitions to 1`

This means that the source max parallelism was determined by the available 
kafka partitions to be 1. It would not make sense to increase the parallelism 
even though we are clearly falling behind as the source cannot consume a single 
partition in parallel.

Hope this helps
Gyula

On Fri, Jun 7, 2024 at 3:41 AM Chetas Joshi 
<chetas.jo...@gmail.com<mailto:chetas.jo...@gmail.com>> wrote:
Hi Community,

I want to understand the following logs from the flink-k8s-operator autoscaler. 
My flink pipeline running on 1.18.0 and using flink-k8s-operator (1.8.0) is not 
scaling up even though the source vertex is back-pressured.



2024-06-06 21:33:35,270 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Updating source 
cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on available partitions 
to 1

2024-06-06 21:33:35,276 o.a.f.a.RestApiMetricsCollector 
[DEBUG][flink/pipeline-pipelinelocal] Querying metrics 
{busyTimeMsPerSecond=BUSY_TIME_PER_SEC, 
Source__dd-log-source.numRecordsOut=SOURCE_TASK_NUM_RECORDS_OUT, 
Source__dd-log-source.numRecordsIn=SOURCE_TASK_NUM_RECORDS_IN, 
Source__dd-log-source.pendingRecords=PENDING_RECORDS, 
Source__dd-log-source.numRecordsInPerSecond=SOURCE_TASK_NUM_RECORDS_IN_PER_SEC, 
backPressuredTimeMsPerSecond=BACKPRESSURE_TIME_PER_SEC} for 
cbc357ccb763df2852fee8c4fc7d55f2

2024-06-06 21:33:35,282 o.a.f.a.RestApiMetricsCollector 
[DEBUG][flink/pipeline-pipelinelocal] Querying metrics 
{busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for 61214243927da46230dfd349fba7b8e6

2024-06-06 21:33:35,286 o.a.f.a.RestApiMetricsCollector 
[DEBUG][flink/pipeline-pipelinelocal] Querying metrics 
{busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for 7758b2b5ada48872db09a5c48176e34e

2024-06-06 21:33:35,291 o.a.f.a.RestApiMetricsCollector 
[DEBUG][flink/pipeline-pipelinelocal] Querying metrics 
{busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for eab9c0013081b8479e60463931f3a593

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Calculating vertex scaling metrics for 
cbc357ccb763df2852fee8c4fc7d55f2 from 
{BACKPRESSURE_TIME_PER_SEC=AggregatedMetric{id='backPressuredTimeMsPerSecond', 
mim='0.0', max='0.0', avg='0.0', sum='0.0'}, 
BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='192.0', 
max='192.0', avg='192.0', sum='192.0'}, 
SOURCE_TASK_NUM_RECORDS_OUT=AggregatedMetric{id='Source__dd-log-source.numRecordsOut',
 mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'}, 
PENDING_RECORDS=AggregatedMetric{id='Source__dd-log-source.pendingRecords', 
mim='0.0', max='0.0', avg='0.0', sum='0.0'}, 
SOURCE_TASK_NUM_RECORDS_IN=AggregatedMetric{id='Source__dd-log-source.numRecordsIn',
 mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'}, 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC=AggregatedMetric{id='Source__dd-log-source.numRecordsInPerSecond',
 mim='1682.7333333333333', max='1682.7333333333333', avg='1682.7333333333333', 
sum='1682.7333333333333'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Vertex scaling metrics for 
cbc357ccb763df2852fee8c4fc7d55f2: {ACCUMULATED_BUSY_TIME=32301.0, 
NUM_RECORDS_OUT=125.0, LOAD=0.192, NUM_RECORDS_IN=613279.0, 
OBSERVED_TPR=Infinity, LAG=0.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Calculating vertex scaling metrics for 
eab9c0013081b8479e60463931f3a593 from 
{BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='0.0', 
max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Vertex scaling metrics for 
eab9c0013081b8479e60463931f3a593: {ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Calculating vertex scaling metrics for 
61214243927da46230dfd349fba7b8e6 from 
{BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='0.0', 
max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Vertex scaling metrics for 
61214243927da46230dfd349fba7b8e6: {ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Calculating vertex scaling metrics for 
7758b2b5ada48872db09a5c48176e34e from 
{BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='0.0', 
max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Vertex scaling metrics for 
7758b2b5ada48872db09a5c48176e34e: {ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=8.0, LOAD=0.0, NUM_RECORDS_IN=117.0}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
[DEBUG][flink/pipeline-pipelinelocal] Global metrics: {NUM_TASK_SLOTS_USED=1.0, 
HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8, 
METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0}

2024-06-06 21:33:35,306 o.a.f.a.ScalingTracking        
[DEBUG][flink/pipeline-pipelinelocal] Cannot record restart duration because 
already set in the latest record: PT0.114185S

2024-06-06 21:33:35,307 o.a.f.a.JobAutoScalerImpl      
[DEBUG][flink/pipeline-pipelinelocal] Collected metrics: 
CollectedMetricHistory(jobTopology=JobTopology(vertexInfos={cbc357ccb763df2852fee8c4fc7d55f2=VertexInfo(id=cbc357ccb763df2852fee8c4fc7d55f2,
 inputs={}, outputs={61214243927da46230dfd349fba7b8e6=REBALANCE, 
7758b2b5ada48872db09a5c48176e34e=HASH}, parallelism=1, maxParallelism=1, 
originalMaxParallelism=20, finished=false, ioMetrics=IOMetrics(numRecordsIn=0, 
numRecordsOut=125, accumulatedBusyTime=32301.0)), 
eab9c0013081b8479e60463931f3a593=VertexInfo(id=eab9c0013081b8479e60463931f3a593,
 inputs={7758b2b5ada48872db09a5c48176e34e=REBALANCE}, outputs={}, 
parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false, 
ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0, accumulatedBusyTime=0.0)), 
61214243927da46230dfd349fba7b8e6=VertexInfo(id=61214243927da46230dfd349fba7b8e6,
 inputs={cbc357ccb763df2852fee8c4fc7d55f2=REBALANCE}, outputs={}, 
parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false, 
ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0, accumulatedBusyTime=0.0)), 
7758b2b5ada48872db09a5c48176e34e=VertexInfo(id=7758b2b5ada48872db09a5c48176e34e,
 inputs={cbc357ccb763df2852fee8c4fc7d55f2=HASH}, 
outputs={eab9c0013081b8479e60463931f3a593=REBALANCE}, parallelism=1, 
maxParallelism=20, originalMaxParallelism=20, finished=false, 
ioMetrics=IOMetrics(numRecordsIn=117, numRecordsOut=8, 
accumulatedBusyTime=0.0))}, finishedVertices=[], 
verticesInTopologicalOrder=[cbc357ccb763df2852fee8c4fc7d55f2, 
61214243927da46230dfd349fba7b8e6, 7758b2b5ada48872db09a5c48176e34e, 
eab9c0013081b8479e60463931f3a593]), 
metricHistory={2024-06-06T21:32:35.170678Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=20821.0,
 NUM_RECORDS_OUT=109.0, LOAD=0.0, NUM_RECORDS_IN=512339.0, 
OBSERVED_TPR=Infinity, LAG=0.0}, 
eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0}, 
61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0}, 
7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=7.0, LOAD=0.0, NUM_RECORDS_IN=102.0}}, 
globalMetrics={NUM_TASK_SLOTS_USED=1.0, 
HEAP_MAX_USAGE_RATIO=0.5849425971687019, HEAP_MEMORY_USED=4.08495608E8, 
METASPACE_MEMORY_USED=1.43093792E8, MANAGED_MEMORY_USED=0.0}), 
2024-06-06T21:33:35.258489Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=32301.0,
 NUM_RECORDS_OUT=125.0, LOAD=0.192, NUM_RECORDS_IN=613279.0, 
OBSERVED_TPR=Infinity, LAG=0.0}, 
eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0}, 
61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0}, 
7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0, 
NUM_RECORDS_OUT=8.0, LOAD=0.0, NUM_RECORDS_IN=117.0}}, 
globalMetrics={NUM_TASK_SLOTS_USED=1.0, 
HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8, 
METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0})}, 
jobRunningTs=2024-06-06T21:17:35.712Z, fullyCollected=true)

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Restart time used in metrics evaluation: 
PT5M

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Using busy time based tpr 
17498.932104004747 for cbc357ccb763df2852fee8c4fc7d55f2.

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computing edge 
(cbc357ccb763df2852fee8c4fc7d55f2, 61214243927da46230dfd349fba7b8e6) data rate 
for single input downstream task

2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computed output ratio for edge 
(cbc357ccb763df2852fee8c4fc7d55f2 -> 61214243927da46230dfd349fba7b8e6) : 
9.906875371507826E-6

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computing edge 
(cbc357ccb763df2852fee8c4fc7d55f2, 7758b2b5ada48872db09a5c48176e34e) data rate 
for single input downstream task

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computed output ratio for edge 
(cbc357ccb763df2852fee8c4fc7d55f2 -> 7758b2b5ada48872db09a5c48176e34e) : 
1.486031305726174E-4

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computing edge 
(7758b2b5ada48872db09a5c48176e34e, eab9c0013081b8479e60463931f3a593) data rate 
for single input downstream task

2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
[DEBUG][flink/pipeline-pipelinelocal] Computed output ratio for edge 
(7758b2b5ada48872db09a5c48176e34e -> eab9c0013081b8479e60463931f3a593) : 
0.06666666666666667

2024-06-06 21:33:35,308 o.a.f.a.JobAutoScalerImpl      
[DEBUG][flink/pipeline-pipelinelocal] Evaluated metrics: 
EvaluatedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
 average=1679.897), PARALLELISM=EvaluatedScalingMetric(current=1.0, 
average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=83995.0, 
average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN, average=17498.932), 
LOAD=EvaluatedScalingMetric(current=NaN, average=0.096), 
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity, 
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0, 
average=NaN), LAG=EvaluatedScalingMetric(current=0.0, average=NaN)}, 
eab9c0013081b8479e60463931f3a593={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
 average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0, average=NaN), 
MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN, average=Infinity), 
LOAD=EvaluatedScalingMetric(current=NaN, average=0.0), 
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity, 
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0, 
average=NaN)}, 
61214243927da46230dfd349fba7b8e6={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
 average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0, average=NaN), 
MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN, average=Infinity), 
LOAD=EvaluatedScalingMetric(current=NaN, average=0.0), 
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity, 
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0, 
average=NaN)}, 
7758b2b5ada48872db09a5c48176e34e={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
 average=0.25), PARALLELISM=EvaluatedScalingMetric(current=1.0, average=NaN), 
SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=13.0, average=NaN), 
MAX_PARALLELISM=EvaluatedScalingMetric(current=20.0, average=NaN), 
TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN, average=Infinity), 
LOAD=EvaluatedScalingMetric(current=NaN, average=0.0), 
SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity, 
average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0, 
average=NaN)}}, 
globalMetrics={HEAP_MAX_USAGE_RATIO=EvaluatedScalingMetric(current=0.68, 
average=0.632), NUM_TASK_SLOTS_USED=EvaluatedScalingMetric(current=1.0, 
average=NaN), GC_PRESSURE=EvaluatedScalingMetric(current=NaN, average=NaN), 
HEAP_MEMORY_USED=EvaluatedScalingMetric(current=4.74886648E8, 
average=4.41691128E8), 
METASPACE_MEMORY_USED=EvaluatedScalingMetric(current=1.40677456E8, 
average=1.41885624E8), MANAGED_MEMORY_USED=EvaluatedScalingMetric(current=0.0, 
average=0.0)})

2024-06-06 21:33:35,308 o.a.f.a.ScalingExecutor        
[DEBUG][flink/pipeline-pipelinelocal] Restart time used in scaling summary 
computation: PT5M

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Target processing capacity for 
cbc357ccb763df2852fee8c4fc7d55f2 is 168270.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Capped target processing capacity for 
cbc357ccb763df2852fee8c4fc7d55f2 is 168270.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Specified autoscaler maximum parallelism 
200 is greater than the operator max parallelism 1. This means the operator max 
parallelism can never be reached.

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Target processing capacity for 
eab9c0013081b8479e60463931f3a593 is 2.0

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Computed scale factor of 0.0 for 
eab9c0013081b8479e60463931f3a593 is capped by maximum scale down factor to 0.4

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Capped target processing capacity for 
eab9c0013081b8479e60463931f3a593 is Infinity

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Specified autoscaler maximum parallelism 
200 is greater than the operator max parallelism 1. This means the operator max 
parallelism can never be reached.

2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Target processing capacity for 
61214243927da46230dfd349fba7b8e6 is 2.0

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Computed scale factor of 0.0 for 
61214243927da46230dfd349fba7b8e6 is capped by maximum scale down factor to 0.4

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Capped target processing capacity for 
61214243927da46230dfd349fba7b8e6 is Infinity

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Specified autoscaler maximum parallelism 
200 is greater than the operator max parallelism 1. This means the operator max 
parallelism can never be reached.

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Target processing capacity for 
7758b2b5ada48872db09a5c48176e34e is 25.0

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Computed scale factor of 0.0 for 
7758b2b5ada48872db09a5c48176e34e is capped by maximum scale down factor to 0.4

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Capped target processing capacity for 
7758b2b5ada48872db09a5c48176e34e is Infinity

2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
[DEBUG][flink/pipeline-pipelinelocal] Specified autoscaler maximum parallelism 
200 is greater than the operator max parallelism 20. This means the operator 
max parallelism can never be reached.

2024-06-06 21:33:35,309 o.a.f.a.ScalingExecutor        [INFO 
][flink/pipeline-pipelinelocal] All job vertices are currently running at their 
target parallelism.


Some Questions

1. Does the autoscaler decide to scale a job vertex when the target processing 
capacity is higher than the current processing capacity? If yes, how to check 
the current processing capacity?

2. Which metrics in the above logs say the target utilization threshold is not 
reached and hence all the vertices are running at target parallelism?


Thank you
Chetas

Reply via email to