Re: How to set hdfs configuration in flink kubernetes operator?

2023-06-22 Thread 李 琳
Hi Dongwoo,

Thank you very much for your response. It has been very helpful to me.

Your email mentioned the configuration of keytab and krb.file, as well as how 
to configure and write them into HDFS security.

However, if the pod doesn't know the location of the HDFS namenode, it needs to 
load the hdfs-core.xml file into the Flink environment and notify the HDFS 
namenode to write data to HDFS.

In Flink on YARN mode, we can set the "export HADOOP_CONF_DIR" environment 
variable, and the hdfs-core.xml file can be saved in the HADOOP_CONF_DIR. Flink 
can automatically detect the namenode. My main question is how to load the 
hdfs-core.xml file in the Flink Kubernetes operator. If you know how to do 
that, please let me know.

I hope to receive your response via email. Thank you!


发件人: Dongwoo Kim 
发送时间: Wednesday, June 21, 2023 7:56:52 PM
收件人: 李 琳 
抄送: user@flink.apache.org 
主题: Re: How to set hdfs configuration in flink kubernetes operator?

Hi leilinee,

I'm not sure whether this is the best practice but I would like to share our 
experience about configuring HDFS as checkpoint storage while using flink 
kubernetes operator.
There are two steps.

Step 1) Mount krb5-conf & keytab file to flink kubernetes operator pod

You have to create configmap and secret for krb5.conf and keytab respectively, 
and apply below configs to flink kuberentes operator's values.yaml

operatorVolumeMounts:
  create: true
  data:
- mountPath: /opt/flink/krb5.conf
  name: krb5-conf
  subPath: krb5.conf
- mountPath: /opt/flink/{keytab_file}
  name: custom-keytab
  subPath: {keytab_file}
operatorVolumes:
  create: true
  data:
- configMap:
name: krb5-configmap
  name: krb5-conf
- name: custom-keytab
  secret:
secretName: custom-keytab

Step 2) Configure FlinkDeployment like below in your application

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
  flinkConfiguration:
state.checkpoint-storage: "filesystem"
state.checkpoints.dir: "hdfs:{path_for_checkpoint}"
security.kerberos.login.keytab: "/opt/flink/{keytab_file}"   # Absolute 
path in flink k8s operator pod
security.kerberos.login.principal: "{principal_name}"
security.kerberos.relogin.period: "5m"
security.kerberos.krb5-conf.path: "/opt/flink/krb5.conf" # Absolute 
path in flink k8s operator pod

I hope this could help your work.

Best regards
dongwoo



2023년 6월 21일 (수) 오후 7:36, 李 琳 
mailto:leili...@outlook.com>>님이 작성:
Hi all,

Recently, I have been testing the Flink Kubernetes Operator. In the official 
example, the checkpoint/savepoint path is configured with a file system:


state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha

However, in our production environment, we use HDFS to store checkpoint data. 
I'm wondering if it's possible to store checkpoint data in the Flink Kubernetes 
Operator as well. If so, could you please guide me on how to set up HDFS 
configuration in the Flink Kubernetes Operator?

I would greatly appreciate any assistance you can provide. Thank you!


Flink Kubernetes Application freezes in BATCH mode

2023-06-22 Thread Vladislav Keda
Hi all!

I'm trying to submit a Flink Job in Application Mode in the Kubernetes
cluster. I see some problems when an application has a big number of
operators (more than 20 same operators) - it freezes for ~6 minutes after
*2023-06-21 15:46:45,082 WARN
 org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - Property
[transaction.timeout.ms ] not specified.
Setting it to PT1H*
 and until

*2023-06-21 15:53:20,002 INFO
 org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - Disabled
Checkpointing. Checkpointing is not supported and not needed when executing
jobs in BATCH mode.*(logs in attachment)

When I set log.level=DEBUG, I see only this message each 10 seconds:
*2023-06-21 14:51:30,921 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Trigger heartbeat request.*

Please, could you help me understand the cause of this problem and how to
fix it.

Thank you in advance!

Best regards,
Vladislav Keda.


flink-k8s-app.log
Description: Binary data


Data & Task distribution among the available Nodes

2023-06-22 Thread Mahmoud Awad
Hello everyone,

I am trying to understand the mechanism by which Flink distributed the data and 
the tasks among the nodes/task managers in the cluster, assuming all TMs have 
equal resources. I am using the DataSet API on my own machine.
I will try to address the issue with the following questions :

-When we  firstly read the data from the source(Text,CSV..etc.), How does Flink 
ensures the fairly distribution of data from the source to the next subtask ?

-Are there any preferences by which Flink will prefer a task manager on the 
other(assuming all task managers have equal resources) ?

- Based on what, will Flink choose to deploy a specific task in a specific task 
manager ?

I hope I was able to explain my point, thank you in advanced.

Best regards
Mahmoud


Gesendet von Mail für Windows



When does backpressure matter

2023-06-22 Thread Lu Niu
For example, if a flink job reads from kafka do something and writes to
kafka. Do we need to take any actions when the job kafka consumer lag is
low or 0 but some tasks have constant backpressure? Do we need to increase
the parallelism or do some network tuning so that backpressure is constant
0? If so, would that lead to resource overprovision?

Or is it that only when kafka lag keeps increasing while backpressure is
happening at the same time, we need to take action?


Best

Lu


Unsubscribe

2023-06-22 Thread wangwj03