Is it practicle to enrich a Flink DataStream in middle operator with Flink Stateful Functions?

2022-09-26 Thread Marco Villalobos
I indeed see the value of Flink Stateful Functions.

However, if I already have a Flink Job, is it possible to enrich a datastream 
with it?

For example, like this:





I really don't see how it would fit such a purpose.  But, I do see that it 
would be very at the end of a Flink Job, not enrichment, but more like a 
handoff, more like this:



Anybody care to eleborate on how to perform enrichment of a stream the right 
way in Flink? 
I always find it problematic, and I was hoping Stateful Functions would provide 
a silver bullet for that, but I think my thinking is incorrect.

Thank you.

Marco A. Villalobos

回复:flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 Thread JasonLee
Hi
跟重启作业没关系哈,你需要自定义写入 kafka 的分区策略。


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2022年09月26日 23:21 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | flink cdc + kafka场景下增加kafka分区数问题 |
flink cdc 
消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?

Re: Sorting by source event time

2022-09-26 Thread yuxia
You can change to "order by eventTIme". And it should work.

You can sort on event time, but it must be time-ascending-order without 'limit'.
If you still want to a descending order, I think you can try to set the 
internal configuration `__table.exec.sort.non-temporal.enabled__` to true.
But remember it's just experimental, which may bring unexpect behavior.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Noel OConnor" 
收件人: "User" 
发送时间: 星期二, 2022年 9 月 27日 上午 2:10:36
主题: Sorting by source event time

Hi,
I have a temporary view created from a datastream.

tableEnv.createTemporaryView("productDetailsView", productStream,
Schema.newBuilder()
.columnByMetadata("eventTime",
"TIMESTAMP_LTZ(3)", "rowtime", Boolean.TRUE)
.watermark("eventTime", "SOURCE_WATERMARK()")
.build());


and i'm trying to sort it using the following

Table resultTable2 = tableEnv.sqlQuery(
"SELECT * FROM productDetailsView ORDER BY eventTime DESC");

but I get the following error

Caused by: org.apache.flink.table.api.TableException: Sort on a
non-time-attribute field is not supported.

Can you sort on event time or does it have to be part of the actual
payload data type?
Have I missed something obvious ?

cheers
Noel


控制流方式能否改变作业ExecutionGraph?

2022-09-26 Thread casel.chen
我有一个数据同步场景是希望通过修改配置来实时动态修改数据同步的目标,例如使用flink 
cdc将mysql中的变更数据实时同步进kafka,如果后来业务又要求同一份数据再同步进mongodb的话,我是否可以通过修改同步配置来达到不停止原来作业来动态修改数据同步的目标(由一个变多个)?又或者是flink
 cdc整库同步mysql变更数据到kafka一个topic,后来业务又要求按表划分topic,这种能否同样通过修改配置来实现呢?

Re:Re: flink的消费速率是否可以调整

2022-09-26 Thread casel.chen
kafka consumer config里面有一些配置参数可以达到限速功能,例如
max.partition.fetch.bytes
fetch.max.bytes
max.poll.records
详情可以参考 https://kafka.apache.org/24/documentation.html#consumerconfigs
















在 2022-09-26 23:27:30,"yidan zhao"  写道:
>应该不行吧,kafka client本身就没有限速的功能。
>
>Jason_H  于2022年9月26日周一 10:17写道:
>>
>> Hi,各位大佬:
>> 我们在使用flink消费kafka的时候,是否可以在代码中自定义消费速率,来调整源端的消费能力。
>>
>>
>> | |
>> Jason_H
>> |
>> |
>> hyb_he...@163.com
>> |


Re:Re: flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 Thread casel.chen
是的,消息key是由 `库名+表名+主键值` 组成的

















在 2022-09-26 23:29:18,"yidan zhao"  写道:
>之前是如何实现的,通过 kafka 的record key?
>
>casel.chen  于2022年9月26日周一 23:21写道:
>>
>> flink cdc 
>> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?


Sorting by source event time

2022-09-26 Thread Noel OConnor
Hi,
I have a temporary view created from a datastream.

tableEnv.createTemporaryView("productDetailsView", productStream,
Schema.newBuilder()
.columnByMetadata("eventTime",
"TIMESTAMP_LTZ(3)", "rowtime", Boolean.TRUE)
.watermark("eventTime", "SOURCE_WATERMARK()")
.build());


and i'm trying to sort it using the following

Table resultTable2 = tableEnv.sqlQuery(
"SELECT * FROM productDetailsView ORDER BY eventTime DESC");

but I get the following error

Caused by: org.apache.flink.table.api.TableException: Sort on a
non-time-attribute field is not supported.

Can you sort on event time or does it have to be part of the actual
payload data type?
Have I missed something obvious ?

cheers
Noel


RE: Kafka source stops consuming messages from topic after some minutes

2022-09-26 Thread alfredo.vasquez.spglobal.com via user
Hello, Thank you for your response,
Just updating on this issue, this was not an issue on the flink job but I found 
out that was related to this fluentd issue 
(https://github.com/fluent/fluentd/issues/3614) that’s why I was not getting 
all logs expected,
Checking the output kafka topic I see all messages correctly processed.

Regards,

From: Martijn Visser 
Sent: Friday, September 2, 2022 4:54 AM
To: Vasquez, Alfredo 
Cc: user@flink.apache.org
Subject: Re: Kafka source stops consuming messages from topic after some minutes

EXTERNAL MESSAGE


My initial thought is that there's something in your business logic. You're 
reading from one Kafka topic, then you're mentioning that it's "connected" to 
another Kafka topic. What type of business logic are you executing? Are you 
joining data, looking things up etc? My suspicion would be that in this process 
there's an issue which causes that operator to not progress as quickly, causing 
the source to pause/stop reading.

Op do 1 sep. 2022 om 22:40 schreef 
alfredo.vasquez.spglobal.com via user 
mailto:user@flink.apache.org>>:
Hello,

Im using flink-connector-kafka version 1.15.2 to consume messages from a kafka 
topic which has 3 partitions that later its connected to to another kafka 
source and then processed in a BroadcastProcessFunction.

The Kafka source is created as follows:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "90");

KafkaSource kafkaSource = KafkaSource.builder()
  .setBootstrapServers("localhost:9092")
  .setTopics("mytopic")
  .setGroupId("group-id")
  .setClientIdPrefix("client-id")
  .setStartingOffsets(OffsetsInitializer.latest())
  .setProperty("security.protocol", "SSL")
  
.setProperty("partition.discovery.interval.ms",
 "30")
  .setProperties(properties)
  .setDeserializer(new StringDeserializationSchema())
.build();

DataStreamSource myStreamSource =
  env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"myStreamSource");


Then I start sending 10 messages per second to the topic and notice that the 
consumer starts reading messages but after some minutes the consumer stops to 
read messages from the topic, for example if I send 3000 messages to the topic 
only around 1200 or 2000 are consumed.
I do not get any exception or error message in the task manager logs, the job 
does not restart and the backpressure its around 15 to 20% when its reading 
messages and then drops to 0%

Please let me know any suggestion or additional information required to fix 
this issue.

Best.



The information contained in this message is intended only for the recipient, 
and may be a confidential attorney-client communication or may otherwise be 
privileged and confidential and protected from disclosure. If the reader of 
this message is not the intended recipient, or an employee or agent responsible 
for delivering this message to the intended recipient, please be aware that any 
dissemination or copying of this communication is strictly prohibited. If you 
have received this communication in error, please immediately notify us by 
replying to the message and deleting it from your computer. S Global Inc. 
reserves the right, subject to applicable local law, to monitor, review and 
process the content of any electronic message or information sent to or from 
S Global Inc. e-mail addresses without informing the sender or recipient of 
the message. By sending electronic message or information to S Global Inc. 
e-mail addresses you, as the sender, are consenting to S Global Inc. 
processing any of your personal data therein.


Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-26 Thread alfredo.vasquez.spglobal.com via user
Hello community.

Currently we have a BroadcastProcessFunction implementation that is storing the 
broadcast state using a MapStateDescriptor.
I have a use case that needs to load the BroadcastState to perform some 
operation before receiving any processElement or processBroadcastElement 
message.

Is there a way to load the BroadcastState on BroadcastProcessFunction  
instantiation, overriding open(Configuration parameters) method or by 
overriding some other callback function?

Kind regards,



The information contained in this message is intended only for the recipient, 
and may be a confidential attorney-client communication or may otherwise be 
privileged and confidential and protected from disclosure. If the reader of 
this message is not the intended recipient, or an employee or agent responsible 
for delivering this message to the intended recipient, please be aware that any 
dissemination or copying of this communication is strictly prohibited. If you 
have received this communication in error, please immediately notify us by 
replying to the message and deleting it from your computer. S Global Inc. 
reserves the right, subject to applicable local law, to monitor, review and 
process the content of any electronic message or information sent to or from 
S Global Inc. e-mail addresses without informing the sender or recipient of 
the message. By sending electronic message or information to S Global Inc. 
e-mail addresses you, as the sender, are consenting to S Global Inc. 
processing any of your personal data therein.


Re: flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 Thread yidan zhao
之前是如何实现的,通过 kafka 的record key?

casel.chen  于2022年9月26日周一 23:21写道:
>
> flink cdc 
> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?


Re: flink的消费速率是否可以调整

2022-09-26 Thread yidan zhao
应该不行吧,kafka client本身就没有限速的功能。

Jason_H  于2022年9月26日周一 10:17写道:
>
> Hi,各位大佬:
> 我们在使用flink消费kafka的时候,是否可以在代码中自定义消费速率,来调整源端的消费能力。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


flink cdc能否同步DDL语句?

2022-09-26 Thread casel.chen
flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等

flink cdc同只步表的schema到下游kafka topic吗?

2022-09-26 Thread casel.chen
flink cdc同只步表的schema到下游kafka topic吗?类似于confluent kafka schema 
registry,在下游kafka新建一个_schema的topic,key是表名,value是avro格式的schema。如果可以的话要如何实现?

flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 Thread casel.chen
flink cdc 
消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?

Re: Cancel a job in status INITIALIZING

2022-09-26 Thread Matthias Pohl via user
Can you provide the JobManager logs for this case. It sounds odd that the
job was stuck in the INITIALIZING phase.

Matthias

On Wed, Sep 21, 2022 at 11:50 AM Christian Lorenz via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> we’re running a Flink Cluster in standalone/session mode. During a restart
> of a jobmanager one job was stuck in status INITIALIZING.
>
> When trying to cancel the job via CLI the command failed with a
> java.util.concurrent.TimeoutException.
>
> The only way to get rid of this job for us was to stop the jobmanagers and
> delete the zookeeper root node.
>
> Is there a better way of handling this issue as this seems to be very
> unclean to me.
>
>
>
> Kind regards,
>
> Christian
>
> Mapp Digital Germany GmbH with registered offices at Sandstr. 3, 80335
> München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
>
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>


Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread Matthias Pohl via user
Yes, the JobManager will failover in HA mode and all jobs would be
recovered.

On Mon, Sep 26, 2022 at 2:06 PM ramkrishna vasudevan <
ramvasu.fl...@gmail.com> wrote:

> Thanks @Matthias Pohl  . This is informative.  So
> generally in a session cluster if I have more than one job and only one of
> them has this issue, still we will face the same problem?
>
> Regards
> Ram
>
> On Mon, Sep 26, 2022 at 4:32 PM Matthias Pohl 
> wrote:
>
>> I see. Thanks for sharing the logs. It's related to a FLINK-9097 [1]. In
>> order for the job to not be cleaned up entirely after a failure while
>> submitting the job, the JobManager is failed fatally resulting in a
>> failover. That's what you're experiencing.
>>
>> One solution is to fix the permission issue to make the job recover
>> without problems. If that's not what you want to do, you could delete the
>> entry with the key 'jobGraph-04ae99777ee2ed34c13fe8120e68436e' from the
>> JobGraphStore ConfigMap (based on your logs it should
>> be flink-972ac3d8028e45fcafa9b8b7b7f1dafb-custer-config-map). This will
>> prevent the JobManager from recovering this specific job. Keep in mind that
>> you have to clean up any job-related data by yourself in that case.
>>
>> I hope that helps.
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9097
>>
>> On Mon, Sep 26, 2022 at 12:26 PM ramkrishna vasudevan <
>> ramvasu.fl...@gmail.com> wrote:
>>
>>> I got some logs and stack traces from our backend storage. This is not
>>> the entire log though. Can this be useful?  With these set of logs messages
>>> the job manager kept restarting.
>>>
>>> Regards
>>> Ram
>>>
>>> On Mon, Sep 26, 2022 at 3:11 PM ramkrishna vasudevan <
>>> ramvasu.fl...@gmail.com> wrote:
>>>
 Thank you very much for the reply. I have lost the k8s cluster in this
 case before I could capture the logs. I will try to repro this and get back
 to you.

 Regards
 Ram

 On Mon, Sep 26, 2022 at 12:42 PM Matthias Pohl 
 wrote:

> Hi Ramkrishna,
> thanks for reaching out to the Flink community. Could you share the
> JobManager logs to get a better understanding of what's going on? I'm
> wondering why the JobManager is failing when the actual problem is that 
> the
> job is struggling to access a folder. It sounds like there are multiple
> problems here.
>
> Best,
> Matthias
>
> On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
> ramvasu.fl...@gmail.com> wrote:
>
>> Hi all
>>
>> I have a simple job where we read for a given path in cloud storage
>> to watch for new files in a given fodler. While I setup my job there was
>> some permission issue on the folder. The job is STREAMING job.
>> The cluster is set in the session mode and is running on Kubernetes.
>> The job manager since then is failing to come back up and every time
>> it fails with the permission issue. But the point is how should i recover
>> my cluster in this case. Since JM is not there the UI is also not working
>> and how do I remove the bad job from the JM.
>>
>> Regards
>> Ram
>>
>


Re: JobManager restarts on job failure

2022-09-26 Thread Matthias Pohl via user
That's a good point. I forgot about these options. You're right. Cleanup
wouldn't be done in that case. So, upgrading would be a viable option as
you suggested.

Matthias

On Mon, Sep 26, 2022 at 12:53 PM Gyula Fóra  wrote:

> Maybe it is a stupid question but in Flink 1.15 with the following configs
> enabled:
>
> SHUTDOWN_ON_APPLICATION_FINISH = false
> SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true
>
> I think jobmanager pod would not restart but simply go to a terminal
> failed state right?
>
> Gyula
>
> On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl 
> wrote:
>
>> Thanks Evgeniy for reaching out to the community and Gyula for picking it
>> up. I haven't looked into the k8s operator in much detail, yet. So, help me
>> out if I miss something here. But I'm afraid that this is not something
>> that would be fixed by upgrading to 1.15.
>> The issue here is that we're recovering from an external checkpoint using
>> the same job ID (the default one used for any Flink cluster in Application
>> Mode) and the same cluster ID, if I understand correctly. Now, the job is
>> failing during initialization. Currently, this causes a global cleanup [1].
>> All HA data including the checkpoints are going to be deleted. I created
>> FLINK-29415 [2] to cover this.
>>
>> I'm wondering whether we could work around this problem by specifying a
>> random job ID through PipelineOptionsInternal [3] in the Kubernetes
>> Operator. But I haven't looked into all the consequences around that. And
>> it feels wrong to make this configuration parameter publicly usable.
>>
>> Another option might be to use ExecutionMode.RECOVERY in case of an
>> initialization failure when recovering from an external Checkpoint in
>> Application Mode (like we do it for internal recovery already).
>>
>> I'm looking forward to your opinion.
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
>> [2] https://issues.apache.org/jira/browse/FLINK-29415
>> [3]
>> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29
>>
>> On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra  wrote:
>>
>>> I see I think we have seen this issue with others before, in Flink 1.15
>>> it is solved by the newly introduced JobResultStore. The operator also
>>> configures that automatically for 1.15 to avoid this.
>>>
>>> Gyula
>>>
>>> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
>>> wrote:
>>>
 Thanks for the answer.
 I think this is not about the operator issue, kubernetes deployment
 just restarts the fallen pod, restarted jobmanager without HA metadata
 starts the job itself from an empty state.

 I'm looking for a way to prevent it from exiting in case of an
 job error (we use application mode cluster).



 --
 *От:* Gyula Fóra 
 *Отправлено:* 20 сентября 2022 г. 19:49:37
 *Кому:* Evgeniy Lyutikov
 *Копия:* user@flink.apache.org
 *Тема:* Re: JobManager restarts on job failure

 The best thing for you to do would be to upgrade to Flink 1.15 and the
 latest operator version.
 In Flink 1.15 we have the option to interact with the Flink jobmanager
 even after the job FAILED and the operator leverages this for a much more
 robust behaviour.

 In any case the operator should not ever start the job from an empty
 state (even if it FAILED), if you think that's happening could you please
 open a JIRA ticket with the accompanying JM and Operator logs?

 Thanks
 Gyula

 On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
 wrote:

> Hi,
> We using flink 1.14.4 with flink kubernetes operator.
>
> Sometimes when updating a job, it fails on startup and flink removes
> all HA metadata and exits the jobmanager.
>
>
> 2022-09-14 14:54:44,534 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - 
> Restoring
> job  from Checkpoint 30829 @ 1663167158684
> for  located at
> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
> 2022-09-14 14:54:44,638 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>  reached terminal state FAILED.
> org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster.
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: There is no operator for the state
> 4e1d9dde287c33a35e7970cbe64a40fe
> 2022-09-14 14:54:44,930 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.

Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread ramkrishna vasudevan
Thanks @Matthias Pohl  . This is informative.  So
generally in a session cluster if I have more than one job and only one of
them has this issue, still we will face the same problem?

Regards
Ram

On Mon, Sep 26, 2022 at 4:32 PM Matthias Pohl 
wrote:

> I see. Thanks for sharing the logs. It's related to a FLINK-9097 [1]. In
> order for the job to not be cleaned up entirely after a failure while
> submitting the job, the JobManager is failed fatally resulting in a
> failover. That's what you're experiencing.
>
> One solution is to fix the permission issue to make the job recover
> without problems. If that's not what you want to do, you could delete the
> entry with the key 'jobGraph-04ae99777ee2ed34c13fe8120e68436e' from the
> JobGraphStore ConfigMap (based on your logs it should
> be flink-972ac3d8028e45fcafa9b8b7b7f1dafb-custer-config-map). This will
> prevent the JobManager from recovering this specific job. Keep in mind that
> you have to clean up any job-related data by yourself in that case.
>
> I hope that helps.
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-9097
>
> On Mon, Sep 26, 2022 at 12:26 PM ramkrishna vasudevan <
> ramvasu.fl...@gmail.com> wrote:
>
>> I got some logs and stack traces from our backend storage. This is not
>> the entire log though. Can this be useful?  With these set of logs messages
>> the job manager kept restarting.
>>
>> Regards
>> Ram
>>
>> On Mon, Sep 26, 2022 at 3:11 PM ramkrishna vasudevan <
>> ramvasu.fl...@gmail.com> wrote:
>>
>>> Thank you very much for the reply. I have lost the k8s cluster in this
>>> case before I could capture the logs. I will try to repro this and get back
>>> to you.
>>>
>>> Regards
>>> Ram
>>>
>>> On Mon, Sep 26, 2022 at 12:42 PM Matthias Pohl 
>>> wrote:
>>>
 Hi Ramkrishna,
 thanks for reaching out to the Flink community. Could you share the
 JobManager logs to get a better understanding of what's going on? I'm
 wondering why the JobManager is failing when the actual problem is that the
 job is struggling to access a folder. It sounds like there are multiple
 problems here.

 Best,
 Matthias

 On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
 ramvasu.fl...@gmail.com> wrote:

> Hi all
>
> I have a simple job where we read for a given path in cloud storage to
> watch for new files in a given fodler. While I setup my job there was some
> permission issue on the folder. The job is STREAMING job.
> The cluster is set in the session mode and is running on Kubernetes.
> The job manager since then is failing to come back up and every time
> it fails with the permission issue. But the point is how should i recover
> my cluster in this case. Since JM is not there the UI is also not working
> and how do I remove the bad job from the JM.
>
> Regards
> Ram
>



Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread Matthias Pohl via user
I see. Thanks for sharing the logs. It's related to a FLINK-9097 [1]. In
order for the job to not be cleaned up entirely after a failure while
submitting the job, the JobManager is failed fatally resulting in a
failover. That's what you're experiencing.

One solution is to fix the permission issue to make the job recover without
problems. If that's not what you want to do, you could delete the entry
with the key 'jobGraph-04ae99777ee2ed34c13fe8120e68436e' from the
JobGraphStore ConfigMap (based on your logs it should
be flink-972ac3d8028e45fcafa9b8b7b7f1dafb-custer-config-map). This will
prevent the JobManager from recovering this specific job. Keep in mind that
you have to clean up any job-related data by yourself in that case.

I hope that helps.
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-9097

On Mon, Sep 26, 2022 at 12:26 PM ramkrishna vasudevan <
ramvasu.fl...@gmail.com> wrote:

> I got some logs and stack traces from our backend storage. This is not the
> entire log though. Can this be useful?  With these set of logs messages the
> job manager kept restarting.
>
> Regards
> Ram
>
> On Mon, Sep 26, 2022 at 3:11 PM ramkrishna vasudevan <
> ramvasu.fl...@gmail.com> wrote:
>
>> Thank you very much for the reply. I have lost the k8s cluster in this
>> case before I could capture the logs. I will try to repro this and get back
>> to you.
>>
>> Regards
>> Ram
>>
>> On Mon, Sep 26, 2022 at 12:42 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Ramkrishna,
>>> thanks for reaching out to the Flink community. Could you share the
>>> JobManager logs to get a better understanding of what's going on? I'm
>>> wondering why the JobManager is failing when the actual problem is that the
>>> job is struggling to access a folder. It sounds like there are multiple
>>> problems here.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
>>> ramvasu.fl...@gmail.com> wrote:
>>>
 Hi all

 I have a simple job where we read for a given path in cloud storage to
 watch for new files in a given fodler. While I setup my job there was some
 permission issue on the folder. The job is STREAMING job.
 The cluster is set in the session mode and is running on Kubernetes.
 The job manager since then is failing to come back up and every time it
 fails with the permission issue. But the point is how should i recover my
 cluster in this case. Since JM is not there the UI is also not working and
 how do I remove the bad job from the JM.

 Regards
 Ram

>>>


Re: JobManager restarts on job failure

2022-09-26 Thread Gyula Fóra
Maybe it is a stupid question but in Flink 1.15 with the following configs
enabled:

SHUTDOWN_ON_APPLICATION_FINISH = false
SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true

I think jobmanager pod would not restart but simply go to a terminal failed
state right?

Gyula

On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl 
wrote:

> Thanks Evgeniy for reaching out to the community and Gyula for picking it
> up. I haven't looked into the k8s operator in much detail, yet. So, help me
> out if I miss something here. But I'm afraid that this is not something
> that would be fixed by upgrading to 1.15.
> The issue here is that we're recovering from an external checkpoint using
> the same job ID (the default one used for any Flink cluster in Application
> Mode) and the same cluster ID, if I understand correctly. Now, the job is
> failing during initialization. Currently, this causes a global cleanup [1].
> All HA data including the checkpoints are going to be deleted. I created
> FLINK-29415 [2] to cover this.
>
> I'm wondering whether we could work around this problem by specifying a
> random job ID through PipelineOptionsInternal [3] in the Kubernetes
> Operator. But I haven't looked into all the consequences around that. And
> it feels wrong to make this configuration parameter publicly usable.
>
> Another option might be to use ExecutionMode.RECOVERY in case of an
> initialization failure when recovering from an external Checkpoint in
> Application Mode (like we do it for internal recovery already).
>
> I'm looking forward to your opinion.
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
> [2] https://issues.apache.org/jira/browse/FLINK-29415
> [3]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29
>
> On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra  wrote:
>
>> I see I think we have seen this issue with others before, in Flink 1.15
>> it is solved by the newly introduced JobResultStore. The operator also
>> configures that automatically for 1.15 to avoid this.
>>
>> Gyula
>>
>> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
>> wrote:
>>
>>> Thanks for the answer.
>>> I think this is not about the operator issue, kubernetes deployment just
>>> restarts the fallen pod, restarted jobmanager without HA metadata
>>> starts the job itself from an empty state.
>>>
>>> I'm looking for a way to prevent it from exiting in case of an job error
>>> (we use application mode cluster).
>>>
>>>
>>>
>>> --
>>> *От:* Gyula Fóra 
>>> *Отправлено:* 20 сентября 2022 г. 19:49:37
>>> *Кому:* Evgeniy Lyutikov
>>> *Копия:* user@flink.apache.org
>>> *Тема:* Re: JobManager restarts on job failure
>>>
>>> The best thing for you to do would be to upgrade to Flink 1.15 and the
>>> latest operator version.
>>> In Flink 1.15 we have the option to interact with the Flink jobmanager
>>> even after the job FAILED and the operator leverages this for a much more
>>> robust behaviour.
>>>
>>> In any case the operator should not ever start the job from an empty
>>> state (even if it FAILED), if you think that's happening could you please
>>> open a JIRA ticket with the accompanying JM and Operator logs?
>>>
>>> Thanks
>>> Gyula
>>>
>>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
>>> wrote:
>>>
 Hi,
 We using flink 1.14.4 with flink kubernetes operator.

 Sometimes when updating a job, it fails on startup and flink removes
 all HA metadata and exits the jobmanager.


 2022-09-14 14:54:44,534 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
 job  from Checkpoint 30829 @ 1663167158684
 for  located at
 s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
 2022-09-14 14:54:44,638 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
  reached terminal state FAILED.
 org.apache.flink.runtime.client.JobInitializationException: Could not
 start the JobMaster.
 Caused by: java.util.concurrent.CompletionException:
 java.lang.IllegalStateException: There is no operator for the state
 4e1d9dde287c33a35e7970cbe64a40fe
 2022-09-14 14:54:44,930 ERROR
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
 error occurred in the cluster entrypoint.
 2022-09-14 14:54:45,020 INFO
 org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
 Clean up the high availability data for job
 .
 2022-09-14 14:54:45,020 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
 

Re: JobManager restarts on job failure

2022-09-26 Thread Matthias Pohl via user
Thanks Evgeniy for reaching out to the community and Gyula for picking it
up. I haven't looked into the k8s operator in much detail, yet. So, help me
out if I miss something here. But I'm afraid that this is not something
that would be fixed by upgrading to 1.15.
The issue here is that we're recovering from an external checkpoint using
the same job ID (the default one used for any Flink cluster in Application
Mode) and the same cluster ID, if I understand correctly. Now, the job is
failing during initialization. Currently, this causes a global cleanup [1].
All HA data including the checkpoints are going to be deleted. I created
FLINK-29415 [2] to cover this.

I'm wondering whether we could work around this problem by specifying a
random job ID through PipelineOptionsInternal [3] in the Kubernetes
Operator. But I haven't looked into all the consequences around that. And
it feels wrong to make this configuration parameter publicly usable.

Another option might be to use ExecutionMode.RECOVERY in case of an
initialization failure when recovering from an external Checkpoint in
Application Mode (like we do it for internal recovery already).

I'm looking forward to your opinion.
Matthias

[1]
https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
[2] https://issues.apache.org/jira/browse/FLINK-29415
[3]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29

On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra  wrote:

> I see I think we have seen this issue with others before, in Flink 1.15 it
> is solved by the newly introduced JobResultStore. The operator also
> configures that automatically for 1.15 to avoid this.
>
> Gyula
>
> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
> wrote:
>
>> Thanks for the answer.
>> I think this is not about the operator issue, kubernetes deployment just
>> restarts the fallen pod, restarted jobmanager without HA metadata starts
>> the job itself from an empty state.
>>
>> I'm looking for a way to prevent it from exiting in case of an job error
>> (we use application mode cluster).
>>
>>
>>
>> --
>> *От:* Gyula Fóra 
>> *Отправлено:* 20 сентября 2022 г. 19:49:37
>> *Кому:* Evgeniy Lyutikov
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: JobManager restarts on job failure
>>
>> The best thing for you to do would be to upgrade to Flink 1.15 and the
>> latest operator version.
>> In Flink 1.15 we have the option to interact with the Flink jobmanager
>> even after the job FAILED and the operator leverages this for a much more
>> robust behaviour.
>>
>> In any case the operator should not ever start the job from an empty
>> state (even if it FAILED), if you think that's happening could you please
>> open a JIRA ticket with the accompanying JM and Operator logs?
>>
>> Thanks
>> Gyula
>>
>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
>> wrote:
>>
>>> Hi,
>>> We using flink 1.14.4 with flink kubernetes operator.
>>>
>>> Sometimes when updating a job, it fails on startup and flink removes all
>>> HA metadata and exits the jobmanager.
>>>
>>>
>>> 2022-09-14 14:54:44,534 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
>>> job  from Checkpoint 30829 @ 1663167158684
>>> for  located at
>>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
>>> 2022-09-14 14:54:44,638 INFO
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>>>  reached terminal state FAILED.
>>> org.apache.flink.runtime.client.JobInitializationException: Could not
>>> start the JobMaster.
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.IllegalStateException: There is no operator for the state
>>> 4e1d9dde287c33a35e7970cbe64a40fe
>>> 2022-09-14 14:54:44,930 ERROR
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
>>> error occurred in the cluster entrypoint.
>>> 2022-09-14 14:54:45,020 INFO
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>>> Clean up the high availability data for job
>>> .
>>> 2022-09-14 14:54:45,020 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
>>> KubernetesApplicationClusterEntrypoint down with application status
>>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>>> 2022-09-14 14:54:45,026 INFO
>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
>>> down rest endpoint.
>>> 2022-09-14 14:54:46,122 INFO
>>> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting
>>> down remote daemon.
>>> 2022-09-14 14:54:46,321 INFO
>>> 

Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread ramkrishna vasudevan
I got some logs and stack traces from our backend storage. This is not the
entire log though. Can this be useful?  With these set of logs messages the
job manager kept restarting.

Regards
Ram

On Mon, Sep 26, 2022 at 3:11 PM ramkrishna vasudevan <
ramvasu.fl...@gmail.com> wrote:

> Thank you very much for the reply. I have lost the k8s cluster in this
> case before I could capture the logs. I will try to repro this and get back
> to you.
>
> Regards
> Ram
>
> On Mon, Sep 26, 2022 at 12:42 PM Matthias Pohl 
> wrote:
>
>> Hi Ramkrishna,
>> thanks for reaching out to the Flink community. Could you share the
>> JobManager logs to get a better understanding of what's going on? I'm
>> wondering why the JobManager is failing when the actual problem is that the
>> job is struggling to access a folder. It sounds like there are multiple
>> problems here.
>>
>> Best,
>> Matthias
>>
>> On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
>> ramvasu.fl...@gmail.com> wrote:
>>
>>> Hi all
>>>
>>> I have a simple job where we read for a given path in cloud storage to
>>> watch for new files in a given fodler. While I setup my job there was some
>>> permission issue on the folder. The job is STREAMING job.
>>> The cluster is set in the session mode and is running on Kubernetes.
>>> The job manager since then is failing to come back up and every time it
>>> fails with the permission issue. But the point is how should i recover my
>>> cluster in this case. Since JM is not there the UI is also not working and
>>> how do I remove the bad job from the JM.
>>>
>>> Regards
>>> Ram
>>>
>>
Message

Message
Starting execution of job WordCount (04ae99777ee2ed34c13fe8120e68436e) under 
job master id a6b99a98ea6483cedb49878b9af2465f.
Starting split enumerator for source Source: file-input -> tokenizer.
Closing SourceCoordinator for source Source: file-input -> tokenizer.
Source coordinator for source Source: file-input -> tokenizer closed.
Fatal error occurred in the cluster entrypoint.
The RpcEndpoint jobmanager_2 failed.
Enabling required built-in plugins
Linking flink-azure-fs-hadoop-1.13.1-0.1.15.jar to plugin directory
Successfully enabled flink-azure-fs-hadoop-1.13.1-0.1.15.jar
sed: couldn't open temporary file /opt/flink/conf/sedWarRga: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sed6VtlPa: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Permission 
denied
sed: couldn't open temporary file /opt/flink/conf/sedWNUSh9: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Starting Job Manager
Starting standalonesession as a console application on host 
972ac3d8028e45fcafa9b8b7b7f1dafb-flink-jobmanager-56457c946f9f6.


RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx1484783613 -Xms1484783613 -XX:MaxMetaspaceSize=268435456
dynamic_configs: -D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=209715203b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1484783613b -D 
jobmanager.memory.jvm-overhead.max=209715203b
logs: WARNING: sun.reflect.Reflection.getCallerClass is not supported. This 
will impact performance.
INFO  [] - Loading configuration property: blob.server.port, 6124
INFO  [] - Loading configuration property: state.checkpoints.num-retained, 5
INFO  [] - Loading configuration property: historyserver.web.port, 8082
INFO  [] - Loading configuration property: rest.flamegraph.enabled, true
INFO  [] - Loading configuration property: jobmanager.rpc.address, 
flink-jobmanager-service
INFO  [] - Loading configuration property: fs.azure.account.auth.type, OAuth
INFO  [] - Loading configuration property: 
fs.azure.account.oauth.provider.type, 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
INFO  [] - Loading configuration property: fs.azure.account.oauth2.client.id, 
4fb013ac-9f6e-40f7-823b-721589d98db8
INFO  [] - Loading configuration property: state.savepoints.dir, 
abfs://vrppestor...@vrppestore.dfs.core.windows.net/972ac3d8028e45fcafa9b8b7b7f1dafb/savepoints
INFO  [] - Loading configuration property: 
fs.azure.identity.transformer.service.principal.substitution.list, '*'
INFO  [] - Loading configuration property: kubernetes.cluster-id, 
flink-972ac3d8028e45fcafa9b8b7b7f1dafb
INFO  [] - Loading configuration property: high-availability.storageDir, 
abfs://vrppestor...@vrppestore.dfs.core.windows.net/972ac3d8028e45fcafa9b8b7b7f1dafb/ha
INFO  [] - Loading configuration property: parallelism.default, 2
INFO  [] - Loading configuration property: kubernetes.namespace, 
972ac3d8028e45fcafa9b8b7b7f1dafb
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2
INFO  [] - Loading configuration property: metrics.reporters, prom
INFO  [] - Loading configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
INFO  [] - Loading 

Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-09-26 Thread Martijn Visser
Hi Yun Tang,

Sorry for the late reply. I haven't seen any tickets related to this topic.
Still think this is an important feature to have supported in Flink, would
love some volunteers on this topic.

Best regards,

Martijn

On Tue, Sep 13, 2022 at 7:47 AM Yun Tang  wrote:

> An interesting topic, I noticed that the datahub community has launched
> the feature request discussion of Flink Integration [1].
>
> @Martijn Visser  Did the Flink community had
> created tickets to track this topic?
> From my current understanding, Flink lacks rich information on 
> FlinkJobListener
> just as Feng mentioned, which has been supported well by Spark, to send
> data lineage to external systems.
>
>
>
> [1] https://feature-requests.datahubproject.io/p/flink-integration
>
>
> Best
> Yun Tang
> --
> *From:* wangqinghuan <1095193...@qq.com>
> *Sent:* Monday, January 17, 2022 18:27
> *To:* user@flink.apache.org 
> *Subject:* Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage
> integration
>
>
> we are using Datahub to address table-level lineage and column-level
> lineage for Flink SQL.
> 在 2022/1/13 23:27, Martijn Visser 写道:
>
> Hi everyone,
>
> I'm currently checking out different metadata platforms, such as Amundsen
> [1] and Datahub [2]. In short, these types of tools try to address problems
> related to topics such as data discovery, data lineage and an overall data
> catalogue.
>
> I'm reaching out to the Dev and User mailing lists to get some feedback.
> It would really help if you could spend a couple of minutes to let me know
> if you already use either one of the two mentioned metadata platforms or
> another one, or are you evaluating such tools? If so, is that for
> the purpose as a catalogue, for lineage or anything else? Any type of
> feedback on these types of tools is appreciated.
>
> Best regards,
>
> Martijn
>
> [1] https://github.com/amundsen-io/amundsen/
> [2] https://github.com/linkedin/datahub
>
>


Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread ramkrishna vasudevan
Thank you very much for the reply. I have lost the k8s cluster in this case
before I could capture the logs. I will try to repro this and get back to
you.

Regards
Ram

On Mon, Sep 26, 2022 at 12:42 PM Matthias Pohl 
wrote:

> Hi Ramkrishna,
> thanks for reaching out to the Flink community. Could you share the
> JobManager logs to get a better understanding of what's going on? I'm
> wondering why the JobManager is failing when the actual problem is that the
> job is struggling to access a folder. It sounds like there are multiple
> problems here.
>
> Best,
> Matthias
>
> On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
> ramvasu.fl...@gmail.com> wrote:
>
>> Hi all
>>
>> I have a simple job where we read for a given path in cloud storage to
>> watch for new files in a given fodler. While I setup my job there was some
>> permission issue on the folder. The job is STREAMING job.
>> The cluster is set in the session mode and is running on Kubernetes.
>> The job manager since then is failing to come back up and every time it
>> fails with the permission issue. But the point is how should i recover my
>> cluster in this case. Since JM is not there the UI is also not working and
>> how do I remove the bad job from the JM.
>>
>> Regards
>> Ram
>>
>


Re: Jobmanager fails to come up if the job has an issue

2022-09-26 Thread Matthias Pohl via user
Hi Ramkrishna,
thanks for reaching out to the Flink community. Could you share the
JobManager logs to get a better understanding of what's going on? I'm
wondering why the JobManager is failing when the actual problem is that the
job is struggling to access a folder. It sounds like there are multiple
problems here.

Best,
Matthias

On Mon, Sep 26, 2022 at 6:25 AM ramkrishna vasudevan <
ramvasu.fl...@gmail.com> wrote:

> Hi all
>
> I have a simple job where we read for a given path in cloud storage to
> watch for new files in a given fodler. While I setup my job there was some
> permission issue on the folder. The job is STREAMING job.
> The cluster is set in the session mode and is running on Kubernetes.
> The job manager since then is failing to come back up and every time it
> fails with the permission issue. But the point is how should i recover my
> cluster in this case. Since JM is not there the UI is also not working and
> how do I remove the bad job from the JM.
>
> Regards
> Ram
>