Re: How can we exclude operator level metrics from getting reported

2024-05-13 Thread Biao Geng
Hi Sachin,

Your setting looks fine to me. If you want to verify that, one way is to
set the log level to 'trace' and check if logs like 'Ignoring metric {}.{}
for reporter #{} due to filter rules.' is printed.

Best,
Biao Geng


Sachin Mittal  于2024年5月12日周日 02:51写道:

> Hi
> I have a following metrics configuration:
>
> metrics.reporters: stsd
> metrics.reporter.stsd.factory.class:
> org.apache.flink.metrics.statsd.StatsDReporterFactory
> metrics.reporter.stsd.host: localhost
> metrics.reporter.stsd.port: '8125'
> metrics.reporter.stsd.filter.excludes: *.operator:*:*
>
> metrics.scope.jm: jm
> metrics.scope.jm-job: jm.
> metrics.scope.tm: tm
> metrics.scope.tm-job: tm.
> metrics.scope.task: tm...
> metrics.scope.operator: tm...
>
> I would like to exclude operator level metrics from getting reported via
> statsd reporter.
> Can anyone please confirm that my setting for 
> metrics.reporter.stsd.filter.excludes
> is correct in achieving the same.
>
> Thanks
> Sachin
>
>


Flink autoscaler with AWS ASG: checkpoint access issue

2024-05-13 Thread Chetas Joshi
Hello,

Set up

I am running my Flink streaming jobs (upgradeMode = stateless) on an AWS
EKS cluster. The node-type for the pods of the streaming jobs belongs to a
node-group that has an AWS ASG (auto scaling group).
The streaming jobs are the FlinkDeployments managed by the
flink-k8s-operator (1.8) and I have enabled the job autoscaler.

Scenario

When the flink auto-scaler scales up a flink streaming job, new flink TMs
are first added onto any existing nodes with available resources. If
resources are not enough to schedule all the TM pods,  ASG adds new nodes
to the EKS cluster and the rest of the TM pods are scheduled on these new
nodes.

Issue

After the scale-up, the TM pods scheduled on the existing nodes with
available resources successfully read the checkpoint from S3 however the TM
pods scheduled on the new nodes added by ASG run into 403 (access denied)
while reading the same checkpoint file from the checkpoint location in S3.

Just FYI: I have disabled memory auto-tuning so the auto-scaling events are
in place.

1. The IAM role associated with the service account being used by the
FlinkDeployment is as expected for the new pods.
2. I am able to reproduce this issue every single time there is a scale-up
that requires ASG to add new nodes to the cluster.
3. If I delete the FlinkDeployment and allow the operator to restart it, it
starts and stops throwing 403.
4. I am also observing some 404 (not found) being reported by certain newly
added TM pods. They are looking for an older checkpoint (for example
looking for chk10 while a chk11 has already been created in S3 and chk10
would have gotten subsumed by chk11)

I would appreciate it if there are any pointers on how to debug this
further.

Let me know if you need more information.

Thank you
Chetas


Proper way to modify log4j config file for kubernetes-session

2024-05-13 Thread Vararu, Vadim
Hi,

Trying to configure loggers in the log4j-console.properties file (that is 
mounted from the host where the kubernetes-session.sh is invoked and referenced 
by the TM processes via - Dlog4j.configurationFile).

Is there a proper (documented) way to do that, meaning to append/modify the 
log4j config file?

Thanks,
Vadim.


Best Practices? Fault Isolation for Processing Large Number of Same-Shaped Input Kafka Topics in a Big Flink Job

2024-05-13 Thread Kevin Lam via user
Hi everyone,

I'm currently prototyping on a project where we need to process a large
number of Kafka input topics (say, a couple of hundred), all of which share
the same DataType/Schema.

Our objective is to run the same Flink SQL on all of the input topics, but
I am concerned about doing this in a single large Flink SQL application for
fault isolation purposes. We'd like to limit the "blast radius" in cases of
data issues or "poison pills" in any particular Kafka topic — meaning, if
one topic runs into a problem, it shouldn’t compromise or halt the
processing of the others.

At the same time, we are concerned about the operational toil associated
with managing hundreds of Flink jobs that are really one logical
application.

Has anyone here tackled a similar challenge? If so:

   1. How did you design your solution to handle a vast number of topics
   without creating a heavy management burden?
   2. What strategies or patterns have you found effective in isolating
   issues within a specific topic so that they do not affect the processing of
   others?
   3. Are there specific configurations or tools within the Flink ecosystem
   that you'd recommend to efficiently manage this scenario?

Any examples, suggestions, or references to relevant documentation would be
helpful. Thank you in advance for your time and help!