Re: Understanding flink-autoscaler behavior

2024-06-24 Thread Chetas Joshi
Hello,

After disabling the adaptive scheduler, I was able to have the operator stop
the job with a savepoint, and resume the job from that savepoint after the
upgrade. However I observed that the upgrade life cycle is quite slow as it
takes down and then brings back up all the task managers. I am wondering if
there are ways to have the operator stop the job with savepoint and resume
from it without taking down the task managers. If I switch from application
mode to session mode (only one job on that session cluster) and use the
adaptive scheduler, does it solve the problem?

Thanks
Chetas

On Wed, Jun 12, 2024 at 7:13 PM Chetas Joshi  wrote:

> Got it. Thanks!
>
> On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen 
> wrote:
>
>> > Does this mean it won't trigger a checkpoint before scaling up or
>> scaling down?
>>
>> The in-place rescaling won't do that.
>>
>> > Do I need the autotuning to be turned on for exactly once processing?
>>
>> It suffices to just go back to the full-restart upgrade mode provided by
>> the operator: disable adaptive scheduler, and set the upgradeMode to
>> savepoint [1]. Operator will stop the job with a final savepoint, and
>> resume the job from that savepoint after the upgrade.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Chetas Joshi 
>> *Sent:* Thursday, June 13, 2024 6:33
>> *To:* Zhanghao Chen 
>> *Cc:* Sachin Sharma ; Gyula Fóra <
>> gyula.f...@gmail.com>; Oscar Perez via user 
>>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi Zhanghao,
>>
>> If I am using the autoscaler (flink-k8s-operator) without enabling
>> auto-tuning, the documentation says it triggers in-place scaling without
>> going through a full job upgrade lifecycle. Does this mean it won't trigger
>> a checkpoint before scaling up or scaling down?
>>
>> I am observing that entries are being read from Kafka (the source in my
>> flink app) multiple times if the flink app goes through scaling. Do I need
>> the autotuning to be turned on for exactly once processing?
>>
>> Thanks
>> Chetas
>>
>> On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
>> wrote:
>>
>> Hi,
>>
>> Reactive mode and the Autoscaler in Kubernetes operator are two different
>> approaches towards elastic scaling of Flink.
>>
>> Reactive mode [1] has to be used together with the passive resource
>> management approach of Flink (only Standalone mode takes this approach),
>> where the TM number is controlled by external systems and the job
>> parallelism is derived from the available resources in a reactive manner.
>> The workflow is as follows:
>>
>> External service (like K8s HPA controller) monitors job load and adjust
>> TM number to match the load -> Flink adjust job parallelism *reactively*.
>>
>> The Autoscaler [2] is to be used with the active resource management
>> approach of Flink (YARN/K8s Application/Session mode), where The TM number
>> is derived from the job resource need, and Flink actively requests/releases
>> TMs to match job need. The workflow for the Autoscaler is:
>>
>> Autoscaler monitors job load and adjusts job parallelism *actively* to
>> match the load -> Flink adjusts TM number to match the job need.
>>
>> I would recommend using the Autoscaler instead of the Reactive mode for:
>>
>>1. The Autoscaler provides a all-in-one solution while Reactive mode
>>needs to be paired with an external TM number adjusting system.
>>2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>>streaming workloads. It will fine-tune each operator's parallelism and 
>> take
>>task load balancing in mind while reactive mode just adjust all operator's
>>parallelism uniformly.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
>> [2]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Sachin Sharma 
>> *Sent:* Saturday, June 8, 2024 3:02
>> *To:* Gyula Fóra 
>> *Cc:* Chetas Joshi ; Oscar Perez via user <
>> user@flink.apache.org>
>> *Subject:* Re: Understanding flink-autoscaler behavior
>>
>> Hi,
>>
&

Re: Understanding flink-autoscaler behavior

2024-06-12 Thread Chetas Joshi
Got it. Thanks!

On Wed, Jun 12, 2024 at 6:49 PM Zhanghao Chen 
wrote:

> > Does this mean it won't trigger a checkpoint before scaling up or
> scaling down?
>
> The in-place rescaling won't do that.
>
> > Do I need the autotuning to be turned on for exactly once processing?
>
> It suffices to just go back to the full-restart upgrade mode provided by
> the operator: disable adaptive scheduler, and set the upgradeMode to
> savepoint [1]. Operator will stop the job with a final savepoint, and
> resume the job from that savepoint after the upgrade.
>
> [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>
>
> Best,
> Zhanghao Chen
> --
> *From:* Chetas Joshi 
> *Sent:* Thursday, June 13, 2024 6:33
> *To:* Zhanghao Chen 
> *Cc:* Sachin Sharma ; Gyula Fóra <
> gyula.f...@gmail.com>; Oscar Perez via user 
>
> *Subject:* Re: Understanding flink-autoscaler behavior
>
> Hi Zhanghao,
>
> If I am using the autoscaler (flink-k8s-operator) without enabling
> auto-tuning, the documentation says it triggers in-place scaling without
> going through a full job upgrade lifecycle. Does this mean it won't trigger
> a checkpoint before scaling up or scaling down?
>
> I am observing that entries are being read from Kafka (the source in my
> flink app) multiple times if the flink app goes through scaling. Do I need
> the autotuning to be turned on for exactly once processing?
>
> Thanks
> Chetas
>
> On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
> wrote:
>
> Hi,
>
> Reactive mode and the Autoscaler in Kubernetes operator are two different
> approaches towards elastic scaling of Flink.
>
> Reactive mode [1] has to be used together with the passive resource
> management approach of Flink (only Standalone mode takes this approach),
> where the TM number is controlled by external systems and the job
> parallelism is derived from the available resources in a reactive manner.
> The workflow is as follows:
>
> External service (like K8s HPA controller) monitors job load and adjust TM
> number to match the load -> Flink adjust job parallelism *reactively*.
>
> The Autoscaler [2] is to be used with the active resource management
> approach of Flink (YARN/K8s Application/Session mode), where The TM number
> is derived from the job resource need, and Flink actively requests/releases
> TMs to match job need. The workflow for the Autoscaler is:
>
> Autoscaler monitors job load and adjusts job parallelism *actively* to
> match the load -> Flink adjusts TM number to match the job need.
>
> I would recommend using the Autoscaler instead of the Reactive mode for:
>
>1. The Autoscaler provides a all-in-one solution while Reactive mode
>needs to be paired with an external TM number adjusting system.
>2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>streaming workloads. It will fine-tune each operator's parallelism and take
>task load balancing in mind while reactive mode just adjust all operator's
>parallelism uniformly.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
> [2]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>
>
> Best,
> Zhanghao Chen
> --
> *From:* Sachin Sharma 
> *Sent:* Saturday, June 8, 2024 3:02
> *To:* Gyula Fóra 
> *Cc:* Chetas Joshi ; Oscar Perez via user <
> user@flink.apache.org>
> *Subject:* Re: Understanding flink-autoscaler behavior
>
> Hi,
>
> I have a question related to this.
>
> I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with
> Reactive mode enabled, I added some dummy slow and fast operator to the
> flink job and i can see there is a back pressure accumulated. but i am not
> sure why my Flink task managers are not scaled by the operator. Also, can
> someone explain if autoscalers job is just to add more task manager and
> then Reactive mode will adjust the parallelism based on the configurations?
> As per the Operator documentation - Users configure the target
> utilization percentage of the operators in the pipeline, e.g. keep the all
> operators between 60% - 80% busy. The autoscaler then finds a parallelism
> configuration such that the output rates of all operators match the input
> rates of all their downstream operators at the targeted utilization. So
> does the autoscaler(Part of kubernetes operator) controls the parallelism
> or the Reactive mode in the Flink job controls it.
>
> Thanks & R

Re: Understanding flink-autoscaler behavior

2024-06-12 Thread Chetas Joshi
Hi Zhanghao,

If I am using the autoscaler (flink-k8s-operator) without enabling
auto-tuning, the documentation says it triggers in-place scaling without
going through a full job upgrade lifecycle. Does this mean it won't trigger
a checkpoint before scaling up or scaling down?

I am observing that entries are being read from Kafka (the source in my
flink app) multiple times if the flink app goes through scaling. Do I need
the autotuning to be turned on for exactly once processing?

Thanks
Chetas

On Fri, Jun 7, 2024 at 8:28 PM Zhanghao Chen 
wrote:

> Hi,
>
> Reactive mode and the Autoscaler in Kubernetes operator are two different
> approaches towards elastic scaling of Flink.
>
> Reactive mode [1] has to be used together with the passive resource
> management approach of Flink (only Standalone mode takes this approach),
> where the TM number is controlled by external systems and the job
> parallelism is derived from the available resources in a reactive manner.
> The workflow is as follows:
>
> External service (like K8s HPA controller) monitors job load and adjust TM
> number to match the load -> Flink adjust job parallelism *reactively*.
>
> The Autoscaler [2] is to be used with the active resource management
> approach of Flink (YARN/K8s Application/Session mode), where The TM number
> is derived from the job resource need, and Flink actively requests/releases
> TMs to match job need. The workflow for the Autoscaler is:
>
> Autoscaler monitors job load and adjusts job parallelism *actively* to
> match the load -> Flink adjusts TM number to match the job need.
>
> I would recommend using the Autoscaler instead of the Reactive mode for:
>
>1. The Autoscaler provides a all-in-one solution while Reactive mode
>needs to be paired with an external TM number adjusting system.
>2. The Autoscaler has a fine-tuned scaling algorithm tailored for
>streaming workloads. It will fine-tune each operator's parallelism and take
>task load balancing in mind while reactive mode just adjust all operator's
>parallelism uniformly.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#reactive-mode
> [2]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/
>
>
> Best,
> Zhanghao Chen
> --
> *From:* Sachin Sharma 
> *Sent:* Saturday, June 8, 2024 3:02
> *To:* Gyula Fóra 
> *Cc:* Chetas Joshi ; Oscar Perez via user <
> user@flink.apache.org>
> *Subject:* Re: Understanding flink-autoscaler behavior
>
> Hi,
>
> I have a question related to this.
>
> I am doing a POC with Kubernetes operator 1.8 and flink 1.18 version with
> Reactive mode enabled, I added some dummy slow and fast operator to the
> flink job and i can see there is a back pressure accumulated. but i am not
> sure why my Flink task managers are not scaled by the operator. Also, can
> someone explain if autoscalers job is just to add more task manager and
> then Reactive mode will adjust the parallelism based on the configurations?
> As per the Operator documentation - Users configure the target
> utilization percentage of the operators in the pipeline, e.g. keep the all
> operators between 60% - 80% busy. The autoscaler then finds a parallelism
> configuration such that the output rates of all operators match the input
> rates of all their downstream operators at the targeted utilization. So
> does the autoscaler(Part of kubernetes operator) controls the parallelism
> or the Reactive mode in the Flink job controls it.
>
> Thanks & Regards,
> Sachin Sharma
>
>
>
>
> On Fri, Jun 7, 2024 at 4:55 AM Gyula Fóra  wrote:
>
> Hi!
>
> To simplify things you can generally look at TRUE_PROCESSING_RATE,
> SCALUE_UP_RATE_THRESHOLD and SCALE_DOWN_RATE_THRESHOLD.
> If TPR is below the scale up threshold then we should scale up and if its
> above the scale down threshold then we scale down.
>
> In your case what we see for your source
> (cbc357ccb763df2852fee8c4fc7d55f2) as logged is that:
>
> TPR: 17498
> SCALE_UP_THRESHOLD: 83995
>
> So it should definitely be scaled up in theory, however you can also see:
> `Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based
> on available partitions to 1`
>
> This means that the source max parallelism was determined by the available
> kafka partitions to be 1. It would not make sense to increase the
> parallelism even though we are clearly falling behind as the source cannot
> consume a single partition in parallel.
>
> Hope this helps
> Gyula
>
> On Fri, Jun 7, 2024 at 3:41 AM Chetas Joshi 
> wrote:
>
> Hi Community,
>
> I want to unde

Understanding flink-autoscaler behavior

2024-06-06 Thread Chetas Joshi
Hi Community,

I want to understand the following logs from the flink-k8s-operator
autoscaler. My flink pipeline running on 1.18.0 and using
flink-k8s-operator (1.8.0) is not scaling up even though the source vertex
is back-pressured.


2024-06-06 21:33:35,270 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Updating source cbc357ccb763df2852fee8c4fc7d55f2 max parallelism based on
available partitions to 1

2024-06-06 21:33:35,276 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC,
Source__dd-log-source.numRecordsOut=SOURCE_TASK_NUM_RECORDS_OUT,
Source__dd-log-source.numRecordsIn=SOURCE_TASK_NUM_RECORDS_IN,
Source__dd-log-source.pendingRecords=PENDING_RECORDS,
Source__dd-log-source.numRecordsInPerSecond=SOURCE_TASK_NUM_RECORDS_IN_PER_SEC,
backPressuredTimeMsPerSecond=BACKPRESSURE_TIME_PER_SEC} for
cbc357ccb763df2852fee8c4fc7d55f2

2024-06-06 21:33:35,282 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
61214243927da46230dfd349fba7b8e6

2024-06-06 21:33:35,286 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
7758b2b5ada48872db09a5c48176e34e

2024-06-06 21:33:35,291 o.a.f.a.RestApiMetricsCollector
[DEBUG][flink/pipeline-pipelinelocal]
Querying metrics {busyTimeMsPerSecond=BUSY_TIME_PER_SEC} for
eab9c0013081b8479e60463931f3a593

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2
from
{BACKPRESSURE_TIME_PER_SEC=AggregatedMetric{id='backPressuredTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'},
BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond', mim='192.0',
max='192.0', avg='192.0', sum='192.0'},
SOURCE_TASK_NUM_RECORDS_OUT=AggregatedMetric{id='Source__dd-log-source.numRecordsOut',
mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
PENDING_RECORDS=AggregatedMetric{id='Source__dd-log-source.pendingRecords',
mim='0.0', max='0.0', avg='0.0', sum='0.0'},
SOURCE_TASK_NUM_RECORDS_IN=AggregatedMetric{id='Source__dd-log-source.numRecordsIn',
mim='613279.0', max='613279.0', avg='613279.0', sum='613279.0'},
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC=AggregatedMetric{id='Source__dd-log-source.numRecordsInPerSecond',
mim='1682.7', max='1682.7',
avg='1682.7', sum='1682.7'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for cbc357ccb763df2852fee8c4fc7d55f2:
{ACCUMULATED_BUSY_TIME=32301.0, NUM_RECORDS_OUT=125.0, LOAD=0.192,
NUM_RECORDS_IN=613279.0, OBSERVED_TPR=Infinity, LAG=0.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for eab9c0013081b8479e60463931f3a593
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for eab9c0013081b8479e60463931f3a593:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for 61214243927da46230dfd349fba7b8e6
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for 61214243927da46230dfd349fba7b8e6:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=0.0, LOAD=0.0,
NUM_RECORDS_IN=8.0}

2024-06-06 21:33:35,304 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Calculating vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e
from {BUSY_TIME_PER_SEC=AggregatedMetric{id='busyTimeMsPerSecond',
mim='0.0', max='0.0', avg='0.0', sum='0.0'}}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Vertex scaling metrics for 7758b2b5ada48872db09a5c48176e34e:
{ACCUMULATED_BUSY_TIME=0.0, NUM_RECORDS_OUT=8.0, LOAD=0.0,
NUM_RECORDS_IN=117.0}

2024-06-06 21:33:35,305 o.a.f.a.ScalingMetricCollector
[DEBUG][flink/pipeline-pipelinelocal]
Global metrics: {NUM_TASK_SLOTS_USED=1.0,
HEAP_MAX_USAGE_RATIO=0.6800108099126959, HEAP_MEMORY_USED=4.74886648E8,
METASPACE_MEMORY_USED=1.40677456E8, MANAGED_MEMORY_USED=0.0}

2024-06-06 21:33:35,306 o.a.f.a.ScalingTracking
[DEBUG][flink/pipeline-pipelinelocal]
Cannot record restart duration because already set in the latest record:
PT0.114185S

2024-06-06 21:33:35,307 o.a.f.a.JobAutoScalerImpl
[DEBUG][flink/pipeline-pipelinelocal]
Collected metrics:

Ways to detect a scaling event within a flink operator at runtime

2024-05-23 Thread Chetas Joshi
Hello,

On a k8s cluster, I have the flink-k8s-operator running 1.8 with autoscaler
= enabled (in-place) and a flinkDeployment (application mode) running
1.18.1.

The flinkDeployment i.e. the flink streaming application has a mock data
producer as the source.  The source generates data points every X milli
to be processed (aggregated) by the downstream operators. The aggregated
results are written to Iceberg.

The pipeline starts with default-parallelism = 1 i..e all the job vertexes
start with par = 1 and X = 0 so all data points are generated continuously.
Due to the lag associated with the aggregation and sink, the
source experiences backpressure and hence the autoscaler triggers a
scale-up. I want to slow down the speed of data production by source after
the first scale-up event. What are the ways I can detect the scale-up event
so that the source can dynamically adjust (increase) X at run-time? I am
wondering if there is a way to detect if the parallelism of any of the
job-vertex in the flink execution graph has gone above 1 within the source
operator at runtime.

This is a test pipeline (flink app) and the goal is to test the scale-up
and scale-down events thus I need to increase X in order to have a
scale-down event get triggered afterwards.

Thank you
Chetas


Re: Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-20 Thread Chetas Joshi
Hello,

After digging into the 403 issue a bit, I figured out that after the
scale-up event, the flink-s3-fs-presto uses the node-profile instead of
IRSA (Iam Role for Service Account) on some of the newly created TM pods.

1. Anyone else experienced this as well?
2. Verified that this is an issue with the flink-s3-fs-presto plugin as if
I switch to the hadoop plugin, I don't run into 403 errors after the
scale-up events.
3. What is the reason why the presto plugin is recommended over the hadoop
plugin while working with the checkpoint files in S3?

Thank you
Chetas

On Mon, May 13, 2024 at 6:59 PM Chetas Joshi  wrote:

> Hello,
>
> Set up
>
> I am running my Flink streaming jobs (upgradeMode = stateless) on an AWS
> EKS cluster. The node-type for the pods of the streaming jobs belongs to a
> node-group that has an AWS ASG (auto scaling group).
> The streaming jobs are the FlinkDeployments managed by the
> flink-k8s-operator (1.8) and I have enabled the job autoscaler.
>
> Scenario
>
> When the flink auto-scaler scales up a flink streaming job, new flink TMs
> are first added onto any existing nodes with available resources. If
> resources are not enough to schedule all the TM pods,  ASG adds new nodes
> to the EKS cluster and the rest of the TM pods are scheduled on these new
> nodes.
>
> Issue
>
> After the scale-up, the TM pods scheduled on the existing nodes with
> available resources successfully read the checkpoint from S3 however the TM
> pods scheduled on the new nodes added by ASG run into 403 (access denied)
> while reading the same checkpoint file from the checkpoint location in S3.
>
> Just FYI: I have disabled memory auto-tuning so the auto-scaling events
> are in place.
>
> 1. The IAM role associated with the service account being used by the
> FlinkDeployment is as expected for the new pods.
> 2. I am able to reproduce this issue every single time there is a scale-up
> that requires ASG to add new nodes to the cluster.
> 3. If I delete the FlinkDeployment and allow the operator to restart it,
> it starts and stops throwing 403.
> 4. I am also observing some 404 (not found) being reported by certain
> newly added TM pods. They are looking for an older checkpoint (for example
> looking for chk10 while a chk11 has already been created in S3 and chk10
> would have gotten subsumed by chk11)
>
> I would appreciate it if there are any pointers on how to debug this
> further.
>
> Let me know if you need more information.
>
> Thank you
> Chetas
>
>


Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-13 Thread Chetas Joshi
Hello,

Set up

I am running my Flink streaming jobs (upgradeMode = stateless) on an AWS
EKS cluster. The node-type for the pods of the streaming jobs belongs to a
node-group that has an AWS ASG (auto scaling group).
The streaming jobs are the FlinkDeployments managed by the
flink-k8s-operator (1.8) and I have enabled the job autoscaler.

Scenario

When the flink auto-scaler scales up a flink streaming job, new flink TMs
are first added onto any existing nodes with available resources. If
resources are not enough to schedule all the TM pods,  ASG adds new nodes
to the EKS cluster and the rest of the TM pods are scheduled on these new
nodes.

Issue

After the scale-up, the TM pods scheduled on the existing nodes with
available resources successfully read the checkpoint from S3 however the TM
pods scheduled on the new nodes added by ASG run into 403 (access denied)
while reading the same checkpoint file from the checkpoint location in S3.

Just FYI: I have disabled memory auto-tuning so the auto-scaling events are
in place.

1. The IAM role associated with the service account being used by the
FlinkDeployment is as expected for the new pods.
2. I am able to reproduce this issue every single time there is a scale-up
that requires ASG to add new nodes to the cluster.
3. If I delete the FlinkDeployment and allow the operator to restart it, it
starts and stops throwing 403.
4. I am also observing some 404 (not found) being reported by certain newly
added TM pods. They are looking for an older checkpoint (for example
looking for chk10 while a chk11 has already been created in S3 and chk10
would have gotten subsumed by chk11)

I would appreciate it if there are any pointers on how to debug this
further.

Let me know if you need more information.

Thank you
Chetas


Re: Flink scheduler keeps trying to schedule the pods indefinitely

2024-05-08 Thread Chetas Joshi
Hey Gyula,

Thanks for getting back.

1) Yes, some more testing revealed the job was able to start with lower
parallelism i.e. lower than the upper bound that was set by the adaptive
scheduler.
2) I am limiting the parallelism of any job-vertex by setting
pipeline.max-parallelism to a value that keeps the number of TMs in check
for a given capacity on my EKS cluster.

Chetas



On Sun, May 5, 2024 at 11:39 PM Gyula Fóra  wrote:

> Hey!
>
> Let me first answer your questions then provide some actual solution
> hopefully :)
>
> 1. The adaptive scheduler would not reduce the vertex desired parallelism
> in this case but it should allow the job to start depending on the
> lower/upper bound resource config. There have been some changes in how the
> k8s operator sets these resource requirements, in the latest 1.8.0 we only
> set the upper bound so that the job can still start with a smaller
> parallelism. So Flink ultimately will keep trying to schedule pods but
> ideally the job would also start/run. I would look at the scheduler logs
> (maybe debug) for more detail.
>
> You can look at configs like:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-wait-timeout
>
> 2. Default scheduler here refers to the Kubernetes pod scheduler not
> Flink's schedulers. So this is normal
>
> As for the solution to the problem. The thing to do is to make the
> autoscaler aware of the resource limits in the first place so that we don't
> scale the job too high. There has been some work on autodetecting these
> limits https://issues.apache.org/jira/browse/FLINK-33771
>
> You can set:
> kubernetes.operator.cluster.resource-view.refresh-interval: 5 min
>
> to turn this on. Alternatively a simpler approach would be to directly
> limit the parallelism of the scaling decisions:
> job.autoscaler.vertex.max-parallelism
>
> Cheers,
> Gyula
>
> On Mon, May 6, 2024 at 8:09 AM Chetas Joshi 
> wrote:
>
>> Hello,
>>
>> I am running a flink job in the application mode on k8s. It's deployed as
>> a FlinkDeployment and its life-cycle is managed by the flink-k8s-operator.
>> The autoscaler is being used with the following config
>>
>> job.autoscaler.enabled: true
>> job.autoscaler.metrics.window: 5m
>> job.autoscaler.stabilization.interval: 1m
>> job.autoscaler.target.utilization: 0.6
>> job.autoscaler.target.utilization.boundary: 0.2
>> pipeline.max-parallelism: 60
>> jobmanager.scheduler: adaptive
>>
>> During a scale-up event, the autoscaler increases the parallelism of one
>> of the job vertex to a higher value. This triggers a bunch of new task
>> managers to be scheduled on the EKS cluster (The node-group has an attached
>> ASG). Out of all the requested TM pods only some get scheduled and then the
>> cluster runs out of resources. The other TM pods remain in the "pending
>> mode" indefinitely and the job is stuck in the "restart" loop forever.
>>
>> 1. Shouldn't the adaptive scheduler reduce the vertex parallelism due to
>> the slots/TMs not being available?
>> 2. When I looked at the pods stuck in the pending state, I found them to
>> be reporting the following events:
>>
>> │   Warning  FailedScheduling   4m55s (x287 over 23h)   default-scheduler
>>   0/5 nodes are available: 1 Insufficient cpu, 1 node(s) didn't match
>> Pod's node affinity/selector, 3 Insufficient memory. preempti │
>>
>> │ on: 0/5 nodes are available: 1 Preemption is not helpful for
>> scheduling, 4 No preemption victims found for incoming pod.
>>   │
>>
>> │   Normal   NotTriggerScaleUp  3m26s (x8555 over 23h)
>> cluster-autoscaler  pod didn't trigger scale-up: 1 max node group size
>> reached
>>
>> The WARN suggests that the "default scheduler" is being used. Why is that
>> the case even though the adaptive scheduler is configured to be used?
>>
>> Appreciate it if you can shed some light on why this could be happening.
>>
>> Thanks
>> Chetas
>>
>


Flink scheduler keeps trying to schedule the pods indefinitely

2024-05-05 Thread Chetas Joshi
Hello,

I am running a flink job in the application mode on k8s. It's deployed as a
FlinkDeployment and its life-cycle is managed by the flink-k8s-operator.
The autoscaler is being used with the following config

job.autoscaler.enabled: true
job.autoscaler.metrics.window: 5m
job.autoscaler.stabilization.interval: 1m
job.autoscaler.target.utilization: 0.6
job.autoscaler.target.utilization.boundary: 0.2
pipeline.max-parallelism: 60
jobmanager.scheduler: adaptive

During a scale-up event, the autoscaler increases the parallelism of one of
the job vertex to a higher value. This triggers a bunch of new task
managers to be scheduled on the EKS cluster (The node-group has an attached
ASG). Out of all the requested TM pods only some get scheduled and then the
cluster runs out of resources. The other TM pods remain in the "pending
mode" indefinitely and the job is stuck in the "restart" loop forever.

1. Shouldn't the adaptive scheduler reduce the vertex parallelism due to
the slots/TMs not being available?
2. When I looked at the pods stuck in the pending state, I found them to be
reporting the following events:

│   Warning  FailedScheduling   4m55s (x287 over 23h)   default-scheduler   0/5
nodes are available: 1 Insufficient cpu, 1 node(s) didn't match Pod's node
affinity/selector, 3 Insufficient memory. preempti │

│ on: 0/5 nodes are available: 1 Preemption is not helpful for scheduling,
4 No preemption victims found for incoming pod.
│

│   Normal   NotTriggerScaleUp  3m26s (x8555 over 23h)  cluster-autoscaler  pod
didn't trigger scale-up: 1 max node group size reached

The WARN suggests that the "default scheduler" is being used. Why is that
the case even though the adaptive scheduler is configured to be used?

Appreciate it if you can shed some light on why this could be happening.

Thanks
Chetas


Re: Autoscaling with flink-k8s-operator 1.8.0

2024-05-02 Thread Chetas Joshi
Hi Gyula,

Thanks for getting back and explaining the difference in the
responsibilities of the autoscaler and the operator.

I figured out what the issue was.
Here is what I was trying to do: the autoscaler had initially down-scaled
(2->1) the flinkDeployment so there was
pipeline.jobvertex-parallelism-overrides set to 1 in the flink config
associated with the flinkDeployment. I manually deleted this config from
the config map and restarted the flinkDeployment thus the pipeline started
running with the default parallelism of 2. Now I was expecting the
autoscaler to again down-scale it however it did not happen because the
autoscaler maintains the overrides in a separate configMap called
"autoscaler-" which had not changed. Thus it was
recognizing that it needs to be down-scaled based on the collected metrics
but did not find the need to override the parallelism in the
"autoscaler-" configMap because it was still set to the
old value of 1. And hence the operator did not call the rescale api.

Since the restart of the flinkDeployment was triggered manually and not by
the operator, the config managed by the autoscaler got out-of-sync with the
actual flink config of the flinkDeployment and hence the issue.

Cheers,
Chetas

On Wed, May 1, 2024 at 10:21 PM Gyula Fóra  wrote:

> Hi Chetas,
>
> The operator logic itself would normally call the rescale api during the
> upgrade process, not the autoscaler module. The autoscaler module sets the
> correct config with the parallelism overrides, and then the operator
> performs the regular upgrade cycle (as when you yourself change something
> in the spec). If only the parallelism overrides change then it will use-the
> rescale api, otherwise a full upgrade is triggered.
>
> Can you share the entire resource yaml and the logs from the operator
> related to the upgrade (after the scaling was triggered)? You can usually
> see from the logs why the in-place scaling wasn't used in a particular case.
> You can debug in-place scaling itself by completely disabling the
> autoscaler and manually setting pipeline.jobvertex-parallelism-overrides in
> the flink config.
>
> Cheers,
> Gyula
>
> On Thu, May 2, 2024 at 3:49 AM Chetas Joshi 
> wrote:
>
>> Hello,
>>
>> We recently upgraded the operator to 1.8.0 to leverage the new
>> autoscaling features (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/).
>> The FlinkDeployment (application cluster) is set to flink v1_18 as well. I
>> am able to observe the following event being reported in the logs of the
>> operator.
>>
>> o.a.f.k.o.l.AuditUtils [INFO ][flink/devpipeline] >>> Event  |
>> Info| SCALINGREPORT   | Scaling execution enabled, begin scaling
>> vertices:{ Vertex ID  | Parallelism 2 -> 1 | Processing capacity
>> Infinity -> Infinity | Target data rate 7.85}{ Vertex ID  |
>> Parallelism 2 -> 1 | Processing capacity Infinity -> Infinity | Target data
>> rate 0.00}{ Vertex ID  | Parallelism 2 -> 1 | Processing capacity
>> Infinity -> Infinity | Target data rate 7.85}{ Vertex ID w |
>> Parallelism 2 -> 1 | Processing capacity 33235.72 -> 13294.29 | Target data
>> rate 6.65}
>>
>> But the in-place autoscaling is not getting triggered. My understanding
>> is that the autoscaler running within the k8s-operator should call the
>> rescale api endpoint of the FlinkDeployment (devpipeline)  with a
>> parallelism overrides map (vertexId => parallelism) and that should trigger
>> a redeploy of the jobGraph. But that is not happening. The restart of the
>> FlinkDeployment overrides the map (vertexId => parallelism) in the
>> configMap resource that stores the flink-config.
>>
>> Am I missing something? How do I debug this further?
>>
>> Here is the flink-config set within the k8s-operator.
>>
>> job.autoscaler.stabilization.interval: 1m
>> job.autoscaler.target.utilization: 0.6
>> job.autoscaler.target.utilization.boundary: 0.2
>> pipeline.max-parallelism: 180
>> jobmanager.scheduler: adaptive
>>
>>
>> Thank you
>> Chetas
>>
>


Autoscaling with flink-k8s-operator 1.8.0

2024-05-01 Thread Chetas Joshi
Hello,

We recently upgraded the operator to 1.8.0 to leverage the new autoscaling
features (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/).
The FlinkDeployment (application cluster) is set to flink v1_18 as well. I
am able to observe the following event being reported in the logs of the
operator.

o.a.f.k.o.l.AuditUtils [INFO ][flink/devpipeline] >>> Event  | Info
  | SCALINGREPORT   | Scaling execution enabled, begin scaling vertices:{
Vertex ID  | Parallelism 2 -> 1 | Processing capacity Infinity ->
Infinity | Target data rate 7.85}{ Vertex ID  | Parallelism 2 -> 1
| Processing capacity Infinity -> Infinity | Target data rate 0.00}{ Vertex
ID  | Parallelism 2 -> 1 | Processing capacity Infinity -> Infinity
| Target data rate 7.85}{ Vertex ID w | Parallelism 2 -> 1 |
Processing capacity 33235.72 -> 13294.29 | Target data rate 6.65}

But the in-place autoscaling is not getting triggered. My understanding is
that the autoscaler running within the k8s-operator should call the rescale
api endpoint of the FlinkDeployment (devpipeline)  with a parallelism
overrides map (vertexId => parallelism) and that should trigger a redeploy
of the jobGraph. But that is not happening. The restart of the
FlinkDeployment overrides the map (vertexId => parallelism) in the
configMap resource that stores the flink-config.

Am I missing something? How do I debug this further?

Here is the flink-config set within the k8s-operator.

job.autoscaler.stabilization.interval: 1m
job.autoscaler.target.utilization: 0.6
job.autoscaler.target.utilization.boundary: 0.2
pipeline.max-parallelism: 180
jobmanager.scheduler: adaptive


Thank you
Chetas


Re: Iceberg connector

2024-04-16 Thread Chetas Joshi
Hi Péter,

Great! Thanks! The resources are really useful.

I don't have TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE set so it is the
FlinkSource
<https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java>
that is being used instead of the IcebergSource. I see the IcebergSource is
more customizable i.e. custom splitComparator or assignerFactory can be
provided as compared to the FlinkSource.

I am reading the iceberg table in the following manner. I am using the
table API for the ease of expressing the SQL query but the table needs to
be converted to the dataStream as all our downstream operators can only
work on the datastream.

table = tableEnv
.from(tableId))
.select($("*"))
.where(filters)
.orderBy($(Columns.*TS*).asc())
.limit(2500);

tableEnv.toDataStream(table).executeAndCollect()

Since toDataStream call is responsible for creating
<https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java#L134>
an instance of the IcebergSource, I don't think there is a way to provide a
custom splitComparator. Correct me if I am wrong. Is the only way out to
use only the DataStream API (and not the table api) if I want to use a
custom splitComparator?



I have another question pertaining to reading an iceberg table using flink
as described in the code above. I want to understand why increasing the
number of task slots on the flink cluster (1 TaskManager) reduces the total
query execution time for the job vertex that reads the iceberg table splits
in the following scenario.

For a given query, the splitPlanner creates 8 splits (I am using the
default values for the iceberg read properties). Each split contains 2 data
files (data input split files) and all the data files are almost the same
size.

1. With # task slots = 4, 4 splits are submitted to the task slots in
parallel at a time. All the 4 tasks return 2500 records and the job
finishes. The other 4 splits do not need to be submitted. The job vertex
takes 2 mins (max time taken by a task among these 4 parallel tasks).
2. With # task slots = 2, 2 splits are submitted to the task slots in
parallel at a time. These 2 tasks return 2500 records and the job finishes.
The other 6 splits do not need to be submitted. The job vertex in this
scenario takes 3 mins; (max time taken by a task among these 2 parallel
tasks).

I expected the job vertex to have taken almost the same amount of time in
both these scenarios because the split size is the same ( same # of data
files and each data file is of the same size) and all the tasks return 2500
records which is the limit of the query.

Would appreciate it if you have any insights into why this behavior.

Thank you
Chetas


On Tue, Apr 16, 2024 at 12:49 PM Péter Váry 
wrote:

> Hi Chetas,
>
> See my answers below:
>
>
> On Tue, Apr 16, 2024, 06:39 Chetas Joshi  wrote:
>
>> Hello,
>>
>> I am running a batch flink job to read an iceberg table. I want to
>> understand a few things.
>>
>> 1. How does the FlinkSplitPlanner decide which fileScanTasks (I think one
>> task corresponds to one data file) need to be clubbed together within a
>> single split and when to create a new split?
>>
>
> You can take a look at the generic read properties for Iceberg tables:
> https://iceberg.apache.org/docs/nightly/configuration/#read-properties
>
> The most interesting ones for you are:
> - read.split.target-size
> - read.split.metadata-target-size
> - read.split.planning-lookback
> - read.split.open-file-cost
>
> 2. When the number of task slots is limited, what is the sequence in which
>> the splits are assigned to the task slots?
>> For example,  if there are 4 task slots available but the number of
>> splits (source parallelism) to be read is 8, which 4 splits will be sent to
>> the task slots first? Where in the codebase does this logic exist?
>>
>
> As a general rule, there is no pre-defined order between the splits, and
> because of the parallelism, the order of the records are not defined.
>
> It is a bit low level API, and might be removed in the future, but you can
> define your own comparator to order the splits:
>
> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java#L248
>
> Or you can use the fileSequenceNumber comparator to order the splits based
> on the commit order:
>
> https://github.com/apache/iceberg/blob/fbcd142c5dc1ec99792ef8edc1378e3a027fecf7/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java#L34
>
> If you have file statistics collected for the t

Iceberg connector

2024-04-15 Thread Chetas Joshi
Hello,

I am running a batch flink job to read an iceberg table. I want to
understand a few things.

1. How does the FlinkSplitPlanner decide which fileScanTasks (I think one
task corresponds to one data file) need to be clubbed together within a
single split and when to create a new split?

2. When the number of task slots is limited, what is the sequence in which
the splits are assigned to the task slots?
For example,  if there are 4 task slots available but the number of splits
(source parallelism) to be read is 8, which 4 splits will be sent to the
task slots first? Where in the codebase does this logic exist?

Would appreciate any docs, pointers to the codebase that could help me
understand the above.

Thanks
Chetas


Re: IcebergSourceReader metrics

2024-04-04 Thread Chetas Joshi
Hi Péter,
Yes, this is exactly what I was looking for. Thanks!
Chetas

On Thu, Mar 28, 2024 at 11:19 PM Péter Váry 
wrote:

> Hi Chetas,
> Are you looking for this information?
>
> *  public IcebergSourceReaderMetrics(MetricGroup metrics, String
> fullTableName) {*
> *MetricGroup readerMetrics =*
> *metrics.addGroup("IcebergSourceReader").addGroup("table",
> fullTableName);*
> *this.assignedSplits = readerMetrics.counter("assignedSplits");*
> *this.assignedBytes = readerMetrics.counter("assignedBytes");*
> *this.finishedSplits = readerMetrics.counter("finishedSplits");*
> *this.finishedBytes = readerMetrics.counter("finishedBytes");*
> *this.splitReaderFetchCalls =
> readerMetrics.counter("splitReaderFetchCalls");*
> *  }*
>
>
> It could be found here:
>
> https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java#L32-L39
>
> Added here:
> https://github.com/apache/iceberg/pull/5554
>
> I hope this helps,
> Peter
>
> Chetas Joshi  ezt írta (időpont: 2024. márc. 29.,
> P, 2:43):
>
>> Hello,
>>
>> I am using Flink to read Iceberg (S3). I have enabled all the metrics
>> scopes in my FlinkDeployment as below
>>
>> metrics.scope.jm: flink.jobmanager
>> metrics.scope.jm.job: flink.jobmanager.job
>> metrics.scope.tm: flink.taskmanager
>> metrics.scope.tm.job: flink.taskmanager.job
>> metrics.scope.task: flink.task
>> metrics.scope.operator: flink.operator
>>
>>
>> I send these metrics to Datadog. I am specifically interested in the
>> IcebergSourceReader metrics. I could not find any information about what
>> metrics to expect here
>> <https://iceberg.apache.org/javadoc/1.1.0/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.html>.
>> In datadog as well, I could not find any metrics related to the
>> IcebergTableSourceReader. Can someone help me understand what metrics
>> associated with the IcebergTableSourceReader should be reported and what
>> metricGroup (my guess was operator) should they be part of?
>>
>> Thank you
>> Chetas
>>
>


IcebergSourceReader metrics

2024-03-28 Thread Chetas Joshi
Hello,

I am using Flink to read Iceberg (S3). I have enabled all the metrics
scopes in my FlinkDeployment as below

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator


I send these metrics to Datadog. I am specifically interested in the
IcebergSourceReader metrics. I could not find any information about what
metrics to expect here
.
In datadog as well, I could not find any metrics related to the
IcebergTableSourceReader. Can someone help me understand what metrics
associated with the IcebergTableSourceReader should be reported and what
metricGroup (my guess was operator) should they be part of?

Thank you
Chetas


FlinkSource to read iceberg table in Batch mode

2024-03-13 Thread Chetas Joshi
Hello,

I am using iceberg-flink-runtime lib (1.17-1.4.0) and running the following
code to read an iceberg table in BATCH mode.

var source = FlinkSource
.forRowData()
.streaming(false)
.env(execEnv)
.tableLoader(tableLoader)
.limit((long) operation.getLimit())
.filters(buildFilterExpression(operation))
.build();

var stream = source.map(rowDataMapper).name(operation.getName());

I am running into the following exception even though streaming = false.

Detected an UNBOUNDED source with the 'execution.runtime-mode' set to
'BATCH'. This combination is not allowed, please set the
'execution.runtime-mode' to STREAMING or AUTOMATIC

Would appreciate any pointers here.

Thank you
Chetas


High latency in reading Iceberg tables using Flink table api

2024-03-12 Thread Chetas Joshi
Hello all,

I am using the flink-iceberg-runtime lib to read an iceberg table into a
Flink datastream. I am using Glue as the catalog. I use the flink table API
to build and query an iceberg table and then use toDataStream to convert it
into a DataStream. Here is the code

Table table = streamTableEnv.from().select(..).where(...)
DataStream stream = streamTableEnv.toDataStream(table)
stream.executeAndCollect()
I have observed that the table construction and the stream
construction (the first two lines of code above) are quite slow. It
takes 6 to 7 seconds. The debugging/profiling exercise has revealed
that there are some inefficiencies. streamTableEnv.toDataStream does
not use the cachingCatalog created and attached to the streamTableEnv
so it hits the external catalog multiple times. toDataStream call
creates a new DummyStreamExecEnv and all the related objects again.
This is where the latency is coming from I think. Has anyone
experienced this? Would appreciate ways to overcome the slowness.

Thank you
Chetas