Admin Client Configs

2023-06-21 Thread Razin Bouzar via user
Looking for some guidance on overriding Kafka admin client configs. The
partition discovery feature sometimes fails to connect with kafka and we'd
like to implement retries. There is no clear documentation on which
settings need to be changed?admin client configs of note:

   - default.api.timeout.ms -- applies for client ops which do not specify
   a timeout param.
   - reconnect.backoff.max
   

   - request.timeout.ms
   - retries
   

   - retry.backoff.ms


Re: How to set hdfs configuration in flink kubernetes operator?

2023-06-21 Thread Dongwoo Kim
Hi leilinee,

I'm not sure whether this is the best practice but I would like to share
our experience about configuring HDFS as checkpoint storage while using
flink kubernetes operator.
There are two steps.

*Step 1)* Mount krb5-conf & keytab file to flink kubernetes operator pod

You have to create configmap and secret for krb5.conf and keytab
respectively, and apply below configs to flink kuberentes operator's
*values.yaml*

operatorVolumeMounts:
  create: true
  data:
- mountPath: /opt/flink/krb5.conf
  name: krb5-conf
  subPath: krb5.conf
- mountPath: /opt/flink/{keytab_file}
  name: custom-keytab
  subPath: {keytab_file}
operatorVolumes:
  create: true
  data:
- configMap:
name: krb5-configmap
  name: krb5-conf
- name: custom-keytab
  secret:
secretName: custom-keytab


*Step 2)* Configure FlinkDeployment like below in your application

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
spec:
  flinkConfiguration:
state.checkpoint-storage: "filesystem"
state.checkpoints.dir: "hdfs:{path_for_checkpoint}"
security.kerberos.login.keytab: "/opt/flink/{keytab_file}"   #
Absolute path in flink k8s operator pod
security.kerberos.login.principal: "{principal_name}"
security.kerberos.relogin.period: "5m"
security.kerberos.krb5-conf.path: "/opt/flink/krb5.conf" #
Absolute path in flink k8s operator pod


I hope this could help your work.

Best regards
dongwoo



2023년 6월 21일 (수) 오후 7:36, 李 琳 님이 작성:

> Hi all,
>
> Recently, I have been testing the Flink Kubernetes Operator. In the
> official example, the checkpoint/savepoint path is configured with a file
> system:
>
>
> state.savepoints.dir: file:///flink-data/savepoints
> state.checkpoints.dir: file:///flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: file:///flink-data/ha
>
> However, in our production environment, we use HDFS to store checkpoint
> data. I'm wondering if it's possible to store checkpoint data in the Flink
> Kubernetes Operator as well. If so, could you please guide me on how to set
> up HDFS configuration in the Flink Kubernetes Operator?
>
> I would greatly appreciate any assistance you can provide. Thank you!
>


How to set hdfs configuration in flink kubernetes operator?

2023-06-21 Thread 李 琳
Hi all,

Recently, I have been testing the Flink Kubernetes Operator. In the official 
example, the checkpoint/savepoint path is configured with a file system:


state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha

However, in our production environment, we use HDFS to store checkpoint data. 
I'm wondering if it's possible to store checkpoint data in the Flink Kubernetes 
Operator as well. If so, could you please guide me on how to set up HDFS 
configuration in the Flink Kubernetes Operator?

I would greatly appreciate any assistance you can provide. Thank you!


Re: Changelog fail leads to job fail regardless of tolerable-failed-checkpoints config

2023-06-21 Thread Dongwoo Kim
Hi Yanfei,
Thanks for the reply.

So uploading changelog doesn't count as checkpointing phase, so I
understood that "execution.checkpointing.tolerable-failed-checkpoints"
is not related to changelog failure.
However, how about making something like a tolerable-failed-changelog
configuration?
This can allow the system to store the changelog in memory when an upload
failure occurs and attempt the upload again during the next cycle.

We believe this could help us avoid unnecessary application restarts caused
by temporary s3 issues.
Is there any expected side effect about this approach?
Currently we are managing this issue by giving a higher number of retries
and timeout threshold for the upload process. But it would be great if we
can just tolerate a configured number of changelog failures.
Thanks in advance

Best regards
dongwoo

2023년 6월 21일 (수) 오후 12:38, Yanfei Lei 님이 작성:

> Hi Dongwoo,
>
> State changelogs are continuously uploaded to the durable storage when
> Changelog state backend is enabled. In other words, it will also
> persist data **outside the checkpoint phase**, and the exception at
> this time will directly cause the job to fail.  And only exceptions in
> the checkpoint phase will be counted as checkpoint failures.
>
> Dongwoo Kim  于2023年6月20日周二 18:31写道:
> >
> > Hello all, I have a question about changelog persist failure.
> > When changelog persist process fails due to an S3 timeout, it seems to
> lead to the job failure regardless of our
> "execution.checkpointing.tolerable-failed-checkpoints" configuration being
> set to 5 with this log.
> >
> > Caused by: java.io.IOException: The upload for 522 has already failed
> previously
> >
> > Upon digging into the source code, I observed that Flink consistently
> checks the sequence number against the latest failed sequence number,
> resulting in an IOException. I am curious about the reasoning behind this
> check as it seems to interfere with the "tolerable-failed-checkpoint"
> configuration working as expected.
> > Can anyone explain the goal behind this design?
> > Additionally, I'd like to propose a potential solution: What if we
> adjusted this section to allow failed changelogs to be uploaded on
> subsequent attempts, up to the "tolerable-failed-checkpoint" limit, before
> declaring the job as failed?
> >
> > Thanks in advance
> >
> > Best regards
> > dongwoo
> >
> >
> >
> >
> >
> >
> >
>
>
> --
> Best,
> Yanfei
>


RocksDB State Backend GET returns null intermittently

2023-06-21 Thread Prabhu Joseph
Hi,

RocksDB State Backend GET call on a key that was PUT into the state like
100 ms earlier but is not returned intermittently. The issue never happened
with the HashDB State backend. We are trying to increase block cache size,
write buffer size, and enable bloom filter as per the doc: -
https://flink.apache.org/2021/01/18/using-rocksdb-state-backend-in-apache-flink-when-and-how/

Any ideas on what could be wrong or how to debug this?

Thanks,
Prabhu Joseph


Re: Default Log4j properties in Native Kubernetes

2023-06-21 Thread Yang Wang
I assume you are using "*bin/flink run-application*" to submit a Flink
application to K8s cluster. Then you could simply
update your local log4j-console.properties, it will be shipped and mounted
to JobManager/TaskManager pods via ConfigMap.

Best,
Yang

Vladislav Keda  于2023年6月20日周二
22:15写道:

> Hi all again!
>
> Please tell me if you can answer my question, thanks.
>
> ---
>
> Best Regards,
> Vladislav Keda
>
> пт, 16 июн. 2023 г. в 16:12, Vladislav Keda <
> vladislav.k...@glowbyteconsulting.com>:
>
>> Hi all!
>>
>> Is it possible to change Flink* log4j-console.properties* in Native
>> Kubernetes (for example in Kubernetes Application mode) without rebuilding
>> the application docker image?
>>
>> I was trying to inject a .sh script call (in the attachment) before
>> /docker-entrypoint.sh, but this workaround did not work (k8s gives me an
>> exception that the log4j* files are write-locked because there is a
>> configmap over them).
>>
>> Is there another way to change log4j* files?
>>
>> Thank you very much in advance!
>>
>> Best Regards,
>> Vladislav Keda
>>
>