Re: Incompatible KafkaProducer version

2023-09-11 Thread Krzysztof Jankiewicz
Hi, Hang.

There have been a lot of changes made to the TransactionsManager in the
Kafka client in 2022.
(e.g.
https://github.com/apache/kafka/commit/3ea7b418fb3d7e9fc74c27751c1b02b04877f197
).

Version 3.2.3 was the last one when the TransactionsManager class contained
attributes (e.g., topicPartitionBookkeeper) referenced by
flink-connector-kafka (1.17.1).

Thanks once again.
Krzysztof

pon., 11 wrz 2023 o 11:24 Hang Ruan  napisał(a):

> Hi, Krzysztof.
>
> I find that this part has been changed in PR[1] when updating the kafka
> client version to 3.4.0.
> This fix is not released yet. Maybe you can package and check it by
> yourself.
>
> Best,
> Hang
>
> [1] https://github.com/apache/flink-connector-kafka/pull/11
>
> Krzysztof Jankiewicz  于2023年9月10日周日
> 21:52写道:
>
>> Hi,
>>
>> I am currently working on a simple application that requires exactly-once
>> end-to-end guarantee.
>>
>> I am reading data from Kafka and writing it back to Kafka.
>>
>> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
>> everything works fine.
>> Here's the relevant code:
>>
>> KafkaSink sink = KafkaSink.builder()
>> . . .
>> .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>> . . .
>> .build();
>>
>> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I
>> encounter the following error during error handling (High Availability mode
>> in k8s)::
>>
>> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>>   at
>> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>> . . .
>> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>>
>> The code causing this issue is as follows
>> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>>
>> Object transactionManager = this.getTransactionManager();
>> synchronized(transactionManager) {
>> Object topicPartitionBookkeeper =
>> getField(transactionManager, "topicPartitionBookkeeper");
>> transitionTransactionManagerStateTo(transactionManager,
>> "INITIALIZING");
>> invoke(topicPartitionBookkeeper, "reset");
>> setField(transactionManager, "producerIdAndEpoch",
>> createProducerIdAndEpoch(producerId, epoch));
>>
>> I am using Apache Kafka 1.17.1 and Apache Kafka Client
>> (org.apache.kafka:kafka-clients) 3.5.1.
>> I have examined the code of
>> org.apache.kafka.clients.producer.internals.TransactionManager, which is
>> used by org.apache.kafka.clients.producer.KafkaProducer.
>> I can see the producerIdAndEpoch field, but there is no
>> topicPartitionBookkeeper field.
>>
>> Could you please advise which version of KafkaProducer is compatible with
>> the flink-connector-kafka? And am I missing something in my configuration?
>>
>> Kind regards
>> Krzysztof
>>
>


回复:flink-metrics如何获取applicationid

2023-09-11 Thread 吴先生
请问好使吗,怎么使用的


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | allanqinjy |
| 发送日期 | 2023年8月30日 20:02 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 回复:flink-metrics如何获取applicationid |
多谢了,明天改一下代码试试
 回复的原邮件 
| 发件人 | Feng Jin |
| 发送日期 | 2023年08月30日 19:42 |
| 收件人 | user-zh |
| 主题 | Re: flink-metrics如何获取applicationid |
hi,

可以尝试获取下 _APP_ID  这个 JVM 环境变量.
System.getenv(YarnConfigKeys.ENV_APP_ID);

https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28


Best,
Feng

On Wed, Aug 30, 2023 at 7:14 PM allanqinjy  wrote:

hi,
请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract
ID(),有方法在这里获取本次上报的作业applicationid吗?


Re: e2e tests with flink

2023-09-11 Thread Feng Jin
Hi Oscar

You can refer to the unit tests in flink-connector-kafka.

https://github.com/apache/flink-connector-kafka/blob/d6525c1481fc2d2821f361d2d5ce48f97221dd74/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java#L152


Best,
Feng

On Mon, Sep 11, 2023 at 5:39 PM Oscar Perez via user 
wrote:

> Hi,
>
> I have a flink job which I want to test e2e.
>
> In the test I start flink minicluster and this reads from kafka topics in
> testcontainers. I m facing a problem that for some topics I have starting
> offset as latest and I want to publish these messages just after the job
> has been completely started, so that these messages can be read
>
> Is there a clean solution to send the payment event after the job has been
> started? Currently I m using Thread.sleep for that but I would like to
> await on something but dont know what would be the trigger for that
>
> Regards,
> Oscar
>
>
>


Reading parquet files using Flink

2023-09-11 Thread Hou, Lijuan via user
Hi team,

Is there any defined way to read Parquet files for flink 1.17.1? I did some 
search, and found 
this
 for flink 1.13.6, using ParquetRowInputFormat, which seems not exist anymore 
in flink 1.17.1. Any examples on the implementation? Thanks!

Best,
Lijuan



Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
You don’t need it but you can really mess up clusters by rolling back CRD
changes…

On Mon, 11 Sep 2023 at 19:42, Evgeniy Lyutikov  wrote:

> Why we need to use latest CRD version with older operator version?
> --
> *От:* Gyula Fóra 
> *Отправлено:* 12 сентября 2023 г. 0:36:26
>
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
> Do not change the CRD but you can roll back the operator itself I believe
>
> Gyula
>
> On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov 
> wrote:
>
>> Is it safe to rollback the operator version with replace to old CRDs?
>> --
>> *От:* Evgeniy Lyutikov 
>> *Отправлено:* 11 сентября 2023 г. 23:50:26
>> *Кому:* Gyula Fóra
>>
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>> from suspend
>>
>>
>> Hi!
>> No, no one could restart jobmanager,
>> I monitored the pods in real time, they all deleted when suspended as
>> expected.
>>
>>
>> --
>> *От:* Gyula Fóra 
>> *Отправлено:* 11 сентября 2023 г. 20:34:52
>> *Кому:* Evgeniy Lyutikov
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>> from suspend
>>
>> Hi!
>>
>> I could not reproduce your issue, last-state suspend/restore seems to
>> work as before.
>> However these 2 logs seem very suspicious:
>>
>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] JobManager is being deployed
>>
>> Looks like after suspending (and deleting the JobManager Deployment)
>> somebody restarted the JobManager manually. Is that possible?
>>
>> Cheers,
>> Gyula
>>
>> On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
>> wrote:
>>
>>> Hi all!
>>> After updating the operator to version 1.6.0, suspended and resuming
>>> flink jobs stopped working.
>>> When job resumes, the high availability metadata is removed.
>>>
>>> Suspend job:
>>> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
>>> suspended]), starting reconciliation.
>>> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>>> ][rec-job/rec-job] Job is in running state, ready for upgrade with
>>> LAST_STATE
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
>>> existing deployment.
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
>>> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
>>> metadata.
>>> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown...
>>> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
>>> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
>>> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
>>> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
>>> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Cluster shutdown completed.
>>> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>>> (job) has been suspended
>>> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>>> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>>>
>>> Resume:
>>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>>> ][rec-job/rec-job] JobManager is being deployed
>>> 2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>>> (job) has been suspended
>>> 2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : suspended ->
>>> running]), starting reconciliation.
>>> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
>>> being upgraded
>>> 

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Evgeniy Lyutikov
Why we need to use latest CRD version with older operator version?


От: Gyula Fóra 
Отправлено: 12 сентября 2023 г. 0:36:26
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend

Do not change the CRD but you can roll back the operator itself I believe

Gyula

On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Is it safe to rollback the operator version with replace to old CRDs?


От: Evgeniy Lyutikov mailto:eblyuti...@avito.ru>>
Отправлено: 11 сентября 2023 г. 23:50:26
Кому: Gyula Fóra

Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend


Hi!

No, no one could restart jobmanager,
I monitored the pods in real time, they all deleted when suspended as expected.




От: Gyula Fóra mailto:gyula.f...@gmail.com>>
Отправлено: 11 сентября 2023 г. 20:34:52
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend

Hi!

I could not reproduce your issue, last-state suspend/restore seems to work as 
before.
However these 2 logs seem very suspicious:

2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed

Looks like after suspending (and deleting the JobManager Deployment) somebody 
restarted the JobManager manually. Is that possible?

Cheers,
Gyula

On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hi all!
After updating the operator to version 1.6.0, suspended and resuming flink jobs 
stopped working.
When job resumes, the high availability metadata is removed.

Suspend job:
2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : running -> suspended]), starting reconciliation.
2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO 
][rec-job/rec-job] Job is in running state, ready for upgrade with LAST_STATE
2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SUSPENDED   | Suspending existing deployment.
2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment while preserving HA metadata.
2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (5s)
2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (10s)
2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (15s)
2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (20s)
2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][rec-job/rec-job] Resource fully reconciled, nothing to do...

Resume:
2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed
2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : suspended -> running]), starting reconciliation.
2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO 
][rec-job/rec-job] Deleting deployment with terminated application before new 
deployment
2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:02:07,649 

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
Do not change the CRD but you can roll back the operator itself I believe

Gyula

On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov  wrote:

> Is it safe to rollback the operator version with replace to old CRDs?
> --
> *От:* Evgeniy Lyutikov 
> *Отправлено:* 11 сентября 2023 г. 23:50:26
> *Кому:* Gyula Fóra
>
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
>
> Hi!
> No, no one could restart jobmanager,
> I monitored the pods in real time, they all deleted when suspended as
> expected.
>
>
> --
> *От:* Gyula Fóra 
> *Отправлено:* 11 сентября 2023 г. 20:34:52
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
> Hi!
>
> I could not reproduce your issue, last-state suspend/restore seems to work
> as before.
> However these 2 logs seem very suspicious:
>
> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] JobManager is being deployed
>
> Looks like after suspending (and deleting the JobManager Deployment)
> somebody restarted the JobManager manually. Is that possible?
>
> Cheers,
> Gyula
>
> On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
> wrote:
>
>> Hi all!
>> After updating the operator to version 1.6.0, suspended and resuming
>> flink jobs stopped working.
>> When job resumes, the high availability metadata is removed.
>>
>> Suspend job:
>> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
>> suspended]), starting reconciliation.
>> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>> ][rec-job/rec-job] Job is in running state, ready for upgrade with
>> LAST_STATE
>> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
>> existing deployment.
>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
>> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
>> metadata.
>> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown...
>> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
>> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
>> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
>> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
>> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Cluster shutdown completed.
>> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>> (job) has been suspended
>> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>>
>> Resume:
>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] JobManager is being deployed
>> 2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>> (job) has been suspended
>> 2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : suspended ->
>> running]), starting reconciliation.
>> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
>> being upgraded
>> 2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO
>> ][rec-job/rec-job] Deleting deployment with terminated application before
>> new deployment
>> 2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>> 2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO
>> ][rec-job/rec-job] Deleting JobManager deployment and HA metadata.
>> 2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown...
>> 2023-09-11 

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Evgeniy Lyutikov
Is it safe to rollback the operator version with replace to old CRDs?


От: Evgeniy Lyutikov 
Отправлено: 11 сентября 2023 г. 23:50:26
Кому: Gyula Fóra
Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend


Hi!

No, no one could restart jobmanager,
I monitored the pods in real time, they all deleted when suspended as expected.




От: Gyula Fóra 
Отправлено: 11 сентября 2023 г. 20:34:52
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend

Hi!

I could not reproduce your issue, last-state suspend/restore seems to work as 
before.
However these 2 logs seem very suspicious:

2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed

Looks like after suspending (and deleting the JobManager Deployment) somebody 
restarted the JobManager manually. Is that possible?

Cheers,
Gyula

On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hi all!
After updating the operator to version 1.6.0, suspended and resuming flink jobs 
stopped working.
When job resumes, the high availability metadata is removed.

Suspend job:
2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : running -> suspended]), starting reconciliation.
2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO 
][rec-job/rec-job] Job is in running state, ready for upgrade with LAST_STATE
2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SUSPENDED   | Suspending existing deployment.
2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment while preserving HA metadata.
2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (5s)
2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (10s)
2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (15s)
2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (20s)
2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][rec-job/rec-job] Resource fully reconciled, nothing to do...

Resume:
2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed
2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : suspended -> running]), starting reconciliation.
2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO 
][rec-job/rec-job] Deleting deployment with terminated application before new 
deployment
2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment and HA metadata.
2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting Kubernetes HA metadata
2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,831 

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Evgeniy Lyutikov
Hi!

No, no one could restart jobmanager,
I monitored the pods in real time, they all deleted when suspended as expected.




От: Gyula Fóra 
Отправлено: 11 сентября 2023 г. 20:34:52
Кому: Evgeniy Lyutikov
Копия: user@flink.apache.org
Тема: Re: Flink kubernets operator delete HA metadata after resuming from 
suspend

Hi!

I could not reproduce your issue, last-state suspend/restore seems to work as 
before.
However these 2 logs seem very suspicious:

2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed

Looks like after suspending (and deleting the JobManager Deployment) somebody 
restarted the JobManager manually. Is that possible?

Cheers,
Gyula

On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
mailto:eblyuti...@avito.ru>> wrote:

Hi all!
After updating the operator to version 1.6.0, suspended and resuming flink jobs 
stopped working.
When job resumes, the high availability metadata is removed.

Suspend job:
2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : running -> suspended]), starting reconciliation.
2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO 
][rec-job/rec-job] Job is in running state, ready for upgrade with LAST_STATE
2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SUSPENDED   | Suspending existing deployment.
2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment while preserving HA metadata.
2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (5s)
2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (10s)
2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (15s)
2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (20s)
2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][rec-job/rec-job] Resource fully reconciled, nothing to do...

Resume:
2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed
2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : suspended -> running]), starting reconciliation.
2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO 
][rec-job/rec-job] Deleting deployment with terminated application before new 
deployment
2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment and HA metadata.
2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting Kubernetes HA metadata
2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,831 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:07,975 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,987 o.a.f.k.o.l.AuditUtils [INFO 

Re: observedGeneration field in FlinkDeployment

2023-09-11 Thread Tony Chen
I think it should be fine for now. I can compare the following 2 fields to
check if the spec has changed:

   - metadata.generation: 4
   - status.reconciliationStatus.lastStableSpec: '{"resource_metadata":
   {"metadata": {"generation": 2}'


On Sat, Sep 9, 2023 at 2:48 AM Gyula Fóra  wrote:

> Actually, I just realized, the last fully reconciled spec generation
> should be written into a metadata JSON inside the lastReconciledSpec. So
> this is already available.
>
> For example:
> lastReconciledSpec: '{"spec":{...},"resource_metadata":{"apiVersion":"
> flink.apache.org/v1beta1
> ","metadata":{"generation":2},"firstDeployment":true}}'
>
> It's a bit hidden but it should do the trick :)
> We could discuss moving this to a more standardized status field if you
> think that's worth the effort.
>
> Gyula
>
> On Sat, Sep 9, 2023 at 7:04 AM Gyula Fóra  wrote:
>
>> Hi!
>> The lastReconciledSpec field serves similar purpose . We also use the
>> generation in parts of the logic but not generically as observed generation
>> .
>>
>> Could you give an example where this would be useful in addition to what
>> we already have?
>>
>> Thanks
>> Gyula
>>
>> On Sat, 9 Sep 2023 at 02:17, Tony Chen  wrote:
>>
>>> Hi Flink Community,
>>>
>>> I noticed that there is no status.observedGeneration field in the
>>> FlinkDeployment spec. The closest field to this is
>>> status.reconciliationStatus.lastReconciledSpec. Are there plans to add the
>>> observedGeneration field in the spec? This field will be helpful in
>>> detecting changes in the FlinkDeployment spec.
>>>
>>> Thanks,
>>> Tony
>>>
>>> --
>>>
>>> 
>>>
>>> Tony Chen
>>>
>>> Software Engineer
>>>
>>> Menlo Park, CA
>>>
>>> Don't copy, share, or use this email without permission. If you received
>>> it by accident, please let us know and then delete it right away.
>>>
>>

-- 



Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
Hi!

I could not reproduce your issue, last-state suspend/restore seems to work
as before.
However these 2 logs seem very suspicious:

2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
][rec-job/rec-job] JobManager is being deployed

Looks like after suspending (and deleting the JobManager Deployment)
somebody restarted the JobManager manually. Is that possible?

Cheers,
Gyula

On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
wrote:

> Hi all!
> After updating the operator to version 1.6.0, suspended and resuming
> flink jobs stopped working.
> When job resumes, the high availability metadata is removed.
>
> Suspend job:
> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
> suspended]), starting reconciliation.
> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
> ][rec-job/rec-job] Job is in running state, ready for upgrade with
> LAST_STATE
> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
> existing deployment.
> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting cluster with Foreground propagation
> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
> metadata.
> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
> (job) has been suspended
> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>
> Resume:
> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] JobManager is being deployed
> 2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
> (job) has been suspended
> 2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
> change(s) detected (Diff: FlinkDeploymentSpec[job.state : suspended ->
> running]), starting reconciliation.
> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][rec-job/rec-job] Deleting deployment with terminated application before
> new deployment
> 2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting cluster with Foreground propagation
> 2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO
> ][rec-job/rec-job] Deleting JobManager deployment and HA metadata.
> 2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting Kubernetes HA metadata
> 2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:02:07,831 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:07,975 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2023-09-11 06:02:07,987 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SUBMIT  | Starting
> deployment
> 2023-09-11 06:02:07,987 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deploying application cluster requiring last-state from
> HA metadata
> 2023-09-11 06:02:07,999 

Feedback on Testing Guidelines for Data Stream Processing Applications

2023-09-11 Thread Alexandre Strapacao Guedes Vianna
Greetings,

I hope this message finds you well.


As part of my PhD research, I've developed guidelines tailored to assist
professionals in planning testing of Data Stream Processing applications.


If you've worked directly with stream processing or have experience with
similar systems, like micro-batch or high-performance ETL systems, your
insights are valuable to us!


 Please take a moment to contribute to our survey.


Your feedback will:

✅ Help refine and validate these guidelines.

✅ Promote a tighter collaboration between industry and academia.

✅ Ensure that our research benefits the wider community.


The improved guidelines will be shared with the community, reinforcing our
commitment to collaborative growth and innovation.


Thank you for supporting this bridge between industry experience and
academic research. Your voice matters!


Please refer to our previous studies if you'd like to delve deeper into
this subject: Exploratory Study
 and Grey Literature
Review



Best Regards,

Alexandre

PS: Kindly consider sharing our survey with colleagues or peers who might
offer valuable insights for our research:
https://forms.gle/qQ4YPoyrnR4wS92J9


e2e tests with flink

2023-09-11 Thread Oscar Perez via user
Hi,

I have a flink job which I want to test e2e.

In the test I start flink minicluster and this reads from kafka topics in
testcontainers. I m facing a problem that for some topics I have starting
offset as latest and I want to publish these messages just after the job
has been completely started, so that these messages can be read

Is there a clean solution to send the payment event after the job has been
started? Currently I m using Thread.sleep for that but I would like to
await on something but dont know what would be the trigger for that

Regards,
Oscar


Re: 退订

2023-09-11 Thread Hang Ruan
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
user-zh@flink.apache.org  邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。
Please send email to user-zh-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user-zh@flink.apache.org ,
and you can refer [1][2] for more details.

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

刘海  于2023年9月11日周一 10:23写道:

> 退订
>
>
>


Re: 退订

2023-09-11 Thread Hang Ruan
Hi,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
user-zh@flink.apache.org  邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。
Please send email to user-zh-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user-zh@flink.apache.org ,
and you can refer [1][2] for more details.

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists


Hunk <24248...@163.com> 于2023年9月11日周一 16:45写道:

> 退订
>
> 发自我的 iPhone


Re: Incompatible KafkaProducer version

2023-09-11 Thread Hang Ruan
Hi, Krzysztof.

I find that this part has been changed in PR[1] when updating the kafka
client version to 3.4.0.
This fix is not released yet. Maybe you can package and check it by
yourself.

Best,
Hang

[1] https://github.com/apache/flink-connector-kafka/pull/11

Krzysztof Jankiewicz  于2023年9月10日周日 21:52写道:

> Hi,
>
> I am currently working on a simple application that requires exactly-once
> end-to-end guarantee.
>
> I am reading data from Kafka and writing it back to Kafka.
>
> When I use `DeliveryGuarantee.AT_LEAST_ONCE` at the Kafka Sink level,
> everything works fine.
> Here's the relevant code:
>
> KafkaSink sink = KafkaSink.builder()
> . . .
> .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> . . .
> .build();
>
> Unfortunately, when I switch to DeliveryGuarantee.EXACTLY_ONCE, I
> encounter the following error during error handling (High Availability mode
> in k8s)::
>
> Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
>   at
> org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
> . . .
> Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
>
> The code causing this issue is as follows
> (org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer):
>
> Object transactionManager = this.getTransactionManager();
> synchronized(transactionManager) {
> Object topicPartitionBookkeeper = getField(transactionManager,
> "topicPartitionBookkeeper");
> transitionTransactionManagerStateTo(transactionManager,
> "INITIALIZING");
> invoke(topicPartitionBookkeeper, "reset");
> setField(transactionManager, "producerIdAndEpoch",
> createProducerIdAndEpoch(producerId, epoch));
>
> I am using Apache Kafka 1.17.1 and Apache Kafka Client
> (org.apache.kafka:kafka-clients) 3.5.1.
> I have examined the code of
> org.apache.kafka.clients.producer.internals.TransactionManager, which is
> used by org.apache.kafka.clients.producer.KafkaProducer.
> I can see the producerIdAndEpoch field, but there is no
> topicPartitionBookkeeper field.
>
> Could you please advise which version of KafkaProducer is compatible with
> the flink-connector-kafka? And am I missing something in my configuration?
>
> Kind regards
> Krzysztof
>


Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Evgeniy Lyutikov
Hi all!
After updating the operator to version 1.6.0, suspended and resuming flink jobs 
stopped working.
When job resumes, the high availability metadata is removed.

Suspend job:
2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : running -> suspended]), starting reconciliation.
2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO 
][rec-job/rec-job] Job is in running state, ready for upgrade with LAST_STATE
2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SUSPENDED   | Suspending existing deployment.
2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment while preserving HA metadata.
2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (5s)
2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (10s)
2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (15s)
2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown... (20s)
2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][rec-job/rec-job] Resource fully reconciled, nothing to do...

Resume:
2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][rec-job/rec-job] JobManager is being deployed
2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| SUSPENDED   | The resource (job) has been suspended
2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SPECCHANGED | UPGRADE change(s) detected (Diff: 
FlinkDeploymentSpec[job.state : suspended -> running]), starting reconciliation.
2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO 
][rec-job/rec-job] Deleting deployment with terminated application before new 
deployment
2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting cluster with Foreground propagation
2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO ][rec-job/rec-job] 
Deleting JobManager deployment and HA metadata.
2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deleting Kubernetes HA metadata
2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Waiting for cluster shutdown...
2023-09-11 06:02:07,831 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Cluster shutdown completed.
2023-09-11 06:02:07,975 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Info| UPGRADING   | The resource is being upgraded
2023-09-11 06:02:07,987 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Info| SUBMIT  | Starting deployment
2023-09-11 06:02:07,987 o.a.f.k.o.s.AbstractFlinkService [INFO 
][rec-job/rec-job] Deploying application cluster requiring last-state from HA 
metadata
2023-09-11 06:02:07,999 o.a.f.k.o.c.FlinkDeploymentController 
[ERROR][rec-job/rec-job] Flink recovery failed
2023-09-11 06:02:08,012 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Event  | Warning | RESTOREFAILED   | HA metadata not available to restore 
from last state. It is possible that the job has finished or terminally failed, 
or the configmaps have been deleted. Manual restore required.
2023-09-11 06:02:08,099 o.a.f.k.o.l.AuditUtils [INFO ][rec-job/rec-job] 
>>> Status | Error   | UPGRADING   | 
{"type":"org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the