Hello,

After disabling the adaptive scheduler, I was able to have the operator stop
the job with a savepoint, and resume the job from that savepoint after the
upgrade. However I observed that the upgrade life cycle is quite slow as it
takes down and then brings back up all the task managers. I am wondering if
there are ways to have the operator stop the job with savepoint and resume
from it without taking down the task managers. If I switch from application
mode to session mode (only one job on that session cluster) and use the
adaptive scheduler, does it solve the problem?

Thanks
Chetas

On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi <chetas.jo...@gmail.com> wrote:

> Got it. Thanks!
>
> On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>
>> > Does this mean it won't trigger a checkpoint before scaling up or
>> scaling down?
>>
>> The in-place rescaling won't do that.
>>
>> > Do I need the autotuning to be turned on for exactly once processing?
>>
>> It suffices to just go back to the full-restart upgrade mode provided by
>> the operator: disable adaptive scheduler, and set the upgradeMode to
>> savepoint [1]. Operator will stop the job with a final savepoint, and
>> resume the job from that savepoint after the upgrade.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>>
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* Chetas Joshi <chetas.jo...@gmail.com>
>> *Sent:* Thursday, June 13, 2024 6:33
>> *To:* Zhanghao Chen <zhanghao.c...@outlook.com>
>> *Cc:* Sachin Sharma <sachinapr...@gmail.com>; Gyula Fóra <
>> gyula.f...@gmail.com>; Oscar Perez via user <user@flink.apache.org>
>>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi Zhanghao,
>>
>> If I am using the autoscaler (flink-k8s-operator) without enabling
>> auto-tuning, the documentation says it triggers in-place scaling without
>> going through a full job upgrade lifecycle. Does this mean it won't trigger
>> a checkpoint before scaling up or scaling down?
>>
>> I am observing that entries are being read from Kafka (the source in my
>> flink app) multiple times if the flink app goes through scaling. Do I need
>> the autotuning to be turned on for exactly once processing?
>>
>> Thanks
>> Chetas
>>
>> On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen <zhanghao.c...@outlook.com>
>> wrote:
>>
>> Hi,
>>
>> Reactive mode and the Autoscaler in Kubernetes operator are two different
>> approaches towards elastic scaling of Flink.
>>
>> Reactive mode [1] has to be used together with the passive resource
>> management approach of Flink (only Standalone mode takes this approach),
>> where the TM number is controlled by external systems and the job
>> parallelism is derived from the available resources in a reactive manner.
>> The workflow is as follows:
>>
>> External service (like K8s HPA controller) monitors job load and adjust
>> TM number to match the load -> Flink adjust job parallelism *reactively*.
>>
>> The Autoscaler [2] is to be used with the active resource management
>> approach of Flink (YARN/K8s Application/Session mode), where The TM number
>> is derived from the job resource need, and Flink actively requests/releases
>> TMs to match job need. The workflow for the Autoscaler is:
>>
>> Autoscaler monitors job load and adjusts job parallelism *actively* to
>> match the load -> Flink adjusts TM number to match the job need.
>>
>> I would recommend using the Autoscaler instead of the Reactive mode for:
>>
>>    1. The Autoscaler provides a all-in-one solution while Reactive mode
>>    needs to be paired with an external TM number adjusting system.
>>    2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>>    streaming workloads. It will fine-tune each operator's parallelism and 
>> take
>>    task load balancing in mind while reactive mode just adjust all operator's
>>    parallelism uniformly.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
>> [2]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>>
>>
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* Sachin Sharma <sachinapr...@gmail.com>
>> *Sent:* Saturday, June 8, 2024 3:02
>> *To:* Gyula Fóra <gyula.f...@gmail.com>
>> *Cc:* Chetas Joshi <chetas.jo...@gmail.com>; Oscar Perez via user <
>> user@flink.apache.org>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi,
>>
>> I have a question related to this.
>>
>> I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with
>> Reactive mode enabled, I added some dummy slow and fast operator to the
>> flink job and i can see there is a back pressure accumulated. but i am not
>> sure why my Flink task managers are not scaled by the operator. Also, can
>> someone explain if autoscalers job is just to add more task manager and
>> then Reactive mode will adjust the parallelism based on the configurations?
>> As per the Operator documentation - Users configure the target
>> utilization percentage of the operators in the pipeline, e.g. keep the all
>> operators between 60% - 80% busy. The autoscaler then finds a parallelism
>> configuration such that the output rates of all operators match the input
>> rates of all their downstream operators at the targeted utilization. So
>> does the autoscaler(Part of kubernetes operator) controls the parallelism
>> or the Reactive mode in the Flink job controls it.
>>
>> Thanks & Regards,
>> Sachin Sharma
>>
>>
>>
>>
>> On Fri, Jun 7, 2024 at 4:55 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>> Hi!
>>
>> To simplify things you can generally look at TRUE_PROCESSING_RATE,
>> SCALUE_UP_RATE_THRESHOLD and SCALE_DOWN_RATE_THRESHOLD.
>> If TPR is below the scale up threshold then we should scale up and if its
>> above the scale down threshold then we scale down.
>>
>> In your case what we see for your source
>> (cbc357ccb763df2852fee8c4fc7d55f2) as logged is that:
>>
>> TPR: 17498
>> SCALE_UP_THRESHOLD: 83995
>>
>> So it should definitely be scaled up in theory, however you can also see:
>> `Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based
>> on available partitions to 1`
>>
>> This means that the source max parallelism was determined by the
>> available kafka partitions to be 1. It would not make sense to increase the
>> parallelism even though we are clearly falling behind as the source cannot
>> consume a single partition in parallel.
>>
>> Hope this helps
>> Gyula
>>
>> On Fri, Jun 7, 2024 at 3:41 AM Chetas Joshi <chetas.jo...@gmail.com>
>> wrote:
>>
>> Hi Community,
>>
>> I want to understand the following logs from the flink-k8s-operator
>> autoscaler. My flink pipeline running on 1.18.0 and using
>> flink-k8s-operator (1.8.0) is not scaling up even though the source vertex
>> is back-pressured.
>>
>>
>> 2024-06-06 21:33:35,270 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on
>> available partitions to 1
>>
>> 2024-06-06 21:33:35,276 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC,
>> Source__dd-log-source.numRecordsOut=SOURCE_TASK_NUM_RECORDS_OUT,
>> Source__dd-log-source.numRecordsIn=SOURCE_TASK_NUM_RECORDS_IN,
>> Source__dd-log-source.pendingRecords=PENDING_RECORDS,
>> Source__dd-log-source.numRecordsInPerSecond=SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
>> backPressuredTimeMsPerSecond=BACKPRESSURE_TIME_PER_SEC} for
>> cbc357ccb763df2852fee8c4fc7d55f2
>>
>> 2024-06-06 21:33:35,282 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> 61214243927da46230dfd349fba7b8e6
>>
>> 2024-06-06 21:33:35,286 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> 7758b2b5ada48872db09a5c48176e34e
>>
>> 2024-06-06 21:33:35,291 o.a.f.a.RestApiMetricsCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
>> eab9c0013081b8479e60463931f3a593
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2
>> from
>> {BACKPRESSURE_TIME_PER_SEC=AggregatedMetric{id='backPressuredTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'},
>> BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='192.0',
>> max='192.0', avg='192.0', sum='192.0'},
>> SOURCE_TASK_NUM_RECORDS_OUT=AggregatedMetric{id='Source__dd-log-source.numRecordsOut',
>> mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
>> PENDING_RECORDS=AggregatedMetric{id='Source__dd-log-source.pendingRecords',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'},
>> SOURCE_TASK_NUM_RECORDS_IN=AggregatedMetric{id='Source__dd-log-source.numRecordsIn',
>> mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
>> SOURCE_TASK_NUM_RECORDS_IN_PER_SEC=AggregatedMetric{id='Source__dd-log-source.numRecordsInPerSecond',
>> mim='1682.7333333333333', max='1682.7333333333333',
>> avg='1682.7333333333333', sum='1682.7333333333333'}}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2:
>> {ACCUMULATED_BUSY_TIME=32301.0, NUM_RECORDS_OUT=125.0, LOAD=0.192,
>> NUM_RECORDS_IN=613279.0, OBSERVED_TPR=Infinity, LAG=0.0}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for eab9c0013081b8479e60463931f3a593
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for eab9c0013081b8479e60463931f3a593:
>> {ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
>> NUM_RECORDS_IN=8.0}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for 61214243927da46230dfd349fba7b8e6
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for 61214243927da46230dfd349fba7b8e6:
>> {ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
>> NUM_RECORDS_IN=8.0}
>>
>> 2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Calculating vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e
>> from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
>> mim='0.0', max='0.0', avg='0.0', sum='0.0'}}
>>
>> 2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e:
>> {ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=8.0, LOAD=0.0,
>> NUM_RECORDS_IN=117.0}
>>
>> 2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Global metrics: {NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
>> METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0}
>>
>> 2024-06-06 21:33:35,306 o.a.f.a.ScalingTracking        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Cannot record restart duration because already set in the latest record:
>> PT0.114185S
>>
>> 2024-06-06 21:33:35,307 o.a.f.a.JobAutoScalerImpl      
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Collected metrics:
>> CollectedMetricHistory(jobTopology=JobTopology(vertexInfos={cbc357ccb763df2852fee8c4fc7d55f2=VertexInfo(id=cbc357ccb763df2852fee8c4fc7d55f2,
>> inputs={}, outputs={61214243927da46230dfd349fba7b8e6=REBALANCE,
>> 7758b2b5ada48872db09a5c48176e34e=HASH}, parallelism=1, maxParallelism=1,
>> originalMaxParallelism=20, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=0, numRecordsOut=125,
>> accumulatedBusyTime=32301.0)),
>> eab9c0013081b8479e60463931f3a593=VertexInfo(id=eab9c0013081b8479e60463931f3a593,
>> inputs={7758b2b5ada48872db09a5c48176e34e=REBALANCE}, outputs={},
>> parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
>> accumulatedBusyTime=0.0)),
>> 61214243927da46230dfd349fba7b8e6=VertexInfo(id=61214243927da46230dfd349fba7b8e6,
>> inputs={cbc357ccb763df2852fee8c4fc7d55f2=REBALANCE}, outputs={},
>> parallelism=1, maxParallelism=1, originalMaxParallelism=1, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=8, numRecordsOut=0,
>> accumulatedBusyTime=0.0)),
>> 7758b2b5ada48872db09a5c48176e34e=VertexInfo(id=7758b2b5ada48872db09a5c48176e34e,
>> inputs={cbc357ccb763df2852fee8c4fc7d55f2=HASH},
>> outputs={eab9c0013081b8479e60463931f3a593=REBALANCE}, parallelism=1,
>> maxParallelism=20, originalMaxParallelism=20, finished=false,
>> ioMetrics=IOMetrics(numRecordsIn=117, numRecordsOut=8,
>> accumulatedBusyTime=0.0))}, finishedVertices=[],
>> verticesInTopologicalOrder=[cbc357ccb763df2852fee8c4fc7d55f2,
>> 61214243927da46230dfd349fba7b8e6, 7758b2b5ada48872db09a5c48176e34e,
>> eab9c0013081b8479e60463931f3a593]),
>> metricHistory={2024-06-06T21:32:35.170678Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=20821.0,
>> NUM_RECORDS_OUT=109.0, LOAD=0.0, NUM_RECORDS_IN=512339.0,
>> OBSERVED_TPR=Infinity, LAG=0.0},
>> eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0},
>> 61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=7.0},
>> 7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=7.0, LOAD=0.0, NUM_RECORDS_IN=102.0}},
>> globalMetrics={NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.5849425971687019, HEAP_MEMORY_USED=4.08495608E8,
>> METASPACE_MEMORY_USED=1.43093792E8, MANAGED_MEMORY_USED=0.0}),
>> 2024-06-06T21:33:35.258489Z=CollectedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={ACCUMULATED_BUSY_TIME=32301.0,
>> NUM_RECORDS_OUT=125.0, LOAD=0.192, NUM_RECORDS_IN=613279.0,
>> OBSERVED_TPR=Infinity, LAG=0.0},
>> eab9c0013081b8479e60463931f3a593={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0},
>> 61214243927da46230dfd349fba7b8e6={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=0.0, LOAD=0.0, NUM_RECORDS_IN=8.0},
>> 7758b2b5ada48872db09a5c48176e34e={ACCUMULATED_BUSY_TIME=0.0,
>> NUM_RECORDS_OUT=8.0, LOAD=0.0, NUM_RECORDS_IN=117.0}},
>> globalMetrics={NUM_TASK_SLOTS_USED=1.0,
>> HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
>> METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0})},
>> jobRunningTs=2024-06-06T21:17:35.712Z, fullyCollected=true)
>>
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Restart time used in metrics evaluation: PT5M
>>
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Using busy time based tpr 17498.932104004747 for
>> cbc357ccb763df2852fee8c4fc7d55f2.
>>
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
>> 61214243927da46230dfd349fba7b8e6) data rate for single input downstream task
>>
>> 2024-06-06 21:33:35,307 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
>> 61214243927da46230dfd349fba7b8e6) : 9.906875371507826E-6
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (cbc357ccb763df2852fee8c4fc7d55f2,
>> 7758b2b5ada48872db09a5c48176e34e) data rate for single input downstream task
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (cbc357ccb763df2852fee8c4fc7d55f2 ->
>> 7758b2b5ada48872db09a5c48176e34e) : 1.486031305726174E-4
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computing edge (7758b2b5ada48872db09a5c48176e34e,
>> eab9c0013081b8479e60463931f3a593) data rate for single input downstream task
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingMetricEvaluator 
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed output ratio for edge (7758b2b5ada48872db09a5c48176e34e ->
>> eab9c0013081b8479e60463931f3a593) : 0.06666666666666667
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobAutoScalerImpl      
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Evaluated metrics:
>> EvaluatedMetrics(vertexMetrics={cbc357ccb763df2852fee8c4fc7d55f2={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=1679.897), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN),
>> SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=83995.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=17498.932), LOAD=EvaluatedScalingMetric(current=NaN,
>> average=0.096),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN), LAG=EvaluatedScalingMetric(current=0.0, average=NaN)},
>> eab9c0013081b8479e60463931f3a593={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)},
>> 61214243927da46230dfd349fba7b8e6={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.017), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=1.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)},
>> 7758b2b5ada48872db09a5c48176e34e={TARGET_DATA_RATE=EvaluatedScalingMetric(current=NaN,
>> average=0.25), PARALLELISM=EvaluatedScalingMetric(current=1.0,
>> average=NaN), SCALE_UP_RATE_THRESHOLD=EvaluatedScalingMetric(current=13.0,
>> average=NaN), MAX_PARALLELISM=EvaluatedScalingMetric(current=20.0,
>> average=NaN), TRUE_PROCESSING_RATE=EvaluatedScalingMetric(current=NaN,
>> average=Infinity), LOAD=EvaluatedScalingMetric(current=NaN, average=0.0),
>> SCALE_DOWN_RATE_THRESHOLD=EvaluatedScalingMetric(current=Infinity,
>> average=NaN), CATCH_UP_DATA_RATE=EvaluatedScalingMetric(current=0.0,
>> average=NaN)}},
>> globalMetrics={HEAP_MAX_USAGE_RATIO=EvaluatedScalingMetric(current=0.68,
>> average=0.632), NUM_TASK_SLOTS_USED=EvaluatedScalingMetric(current=1.0,
>> average=NaN), GC_PRESSURE=EvaluatedScalingMetric(current=NaN, average=NaN),
>> HEAP_MEMORY_USED=EvaluatedScalingMetric(current=4.74886648E8,
>> average=4.41691128E8),
>> METASPACE_MEMORY_USED=EvaluatedScalingMetric(current=1.40677456E8,
>> average=1.41885624E8),
>> MANAGED_MEMORY_USED=EvaluatedScalingMetric(current=0.0, average=0.0)})
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.ScalingExecutor        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Restart time used in scaling summary computation: PT5M
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is 168270.0
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for cbc357ccb763df2852fee8c4fc7d55f2 is
>> 168270.0
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for eab9c0013081b8479e60463931f3a593 is 2.0
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for eab9c0013081b8479e60463931f3a593 is capped
>> by maximum scale down factor to 0.4
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for eab9c0013081b8479e60463931f3a593 is
>> Infinity
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>>
>> 2024-06-06 21:33:35,308 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for 61214243927da46230dfd349fba7b8e6 is 2.0
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for 61214243927da46230dfd349fba7b8e6 is capped
>> by maximum scale down factor to 0.4
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for 61214243927da46230dfd349fba7b8e6 is
>> Infinity
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 1. This means the operator max parallelism can never be
>> reached.
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Target processing capacity for 7758b2b5ada48872db09a5c48176e34e is 25.0
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Computed scale factor of 0.0 for 7758b2b5ada48872db09a5c48176e34e is capped
>> by maximum scale down factor to 0.4
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Capped target processing capacity for 7758b2b5ada48872db09a5c48176e34e is
>> Infinity
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.JobVertexScaler        
>> [DEBUG][flink/pipeline-pipelinelocal]
>> Specified autoscaler maximum parallelism 200 is greater than the operator
>> max parallelism 20. This means the operator max parallelism can never be
>> reached.
>>
>> 2024-06-06 21:33:35,309 o.a.f.a.ScalingExecutor        [INFO ][flink/
>> pipeline-pipelinelocal] All job vertices are currently running at their
>> target parallelism.
>>
>>
>> Some Questions
>>
>> 1. Does the autoscaler decide to scale a job vertex when the target
>> processing capacity is higher than the current processing capacity? If yes,
>> how to check the current processing capacity?
>>
>> 2. Which metrics in the above logs say the target utilization threshold
>> is not reached and hence all the vertices are running at target parallelism?
>>
>>
>> Thank you
>> Chetas
>>
>>
>>

Reply via email to