Re: Flink job Deployement problem

2024-06-05 Thread Hang Ruan
Hi, Fokou Toukam.

This error occurs when the schema in the sink mismatches the schema you
provided from the upstream.
You may need to check whether the provided type of field `features` in sink
is the same as the type in the provided upstream.

Best,
Hang

Fokou Toukam, Thierry  于2024年6月6日周四
10:22写道:

> Hi, i'm trying to deploy flink job but i have this error. How to solve it
> please?
>
> *Thierry FOKOU *| * IT M.A.Sc <http://M.A.Sc> Student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
>
> [image: image001] <http://etsmtl.ca/>
>


Re: Flink job Deployement problem

2024-06-05 Thread Xiqian YU
Hi Fokou,

Seems `features` column was inferenced to be RAW type, which doesn’t carry any 
specific data information, and causes following type casting failed.

Sometimes it will happen when Flink can’t infer return type from a lambda 
expression but no explicit returning type information was provided[1], and 
could be solved by inserting an extraneous .returns() 
call.[2]

If that doesn’t solve your problem, could you please share more information 
like a code snippet or a minimum POC?

Regards,
yux

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/java_lambdas/
[2] 
https://stackoverflow.com/questions/70295970/flink-table-get-type-information


De : Fokou Toukam, Thierry 
Date : jeudi, 6 juin 2024 à 09:04
À : user 
Objet : Flink job Deployement problem
Hi, i'm trying to deploy flink job but i have this error. How to solve it 
please?
[cid:b13357ec-00b9-4a15-8d04-1c797a4eced3]

Thierry FOKOU |  IT M.A.Sc Student

Département de génie logiciel et TI

École de technologie supérieure  |  Université du Québec

1100, rue Notre-Dame Ouest

Montréal (Québec)  H3C 1K3

[image001]<http://etsmtl.ca/>



Flink job Deployement problem

2024-06-05 Thread Fokou Toukam, Thierry
Hi, i'm trying to deploy flink job but i have this error. How to solve it 
please?
[cid:b13357ec-00b9-4a15-8d04-1c797a4eced3]

Thierry FOKOU |  IT M.A.Sc Student

Département de génie logiciel et TI

École de technologie supérieure  |  Université du Québec

1100, rue Notre-Dame Ouest

Montréal (Québec)  H3C 1K3


[image001]<http://etsmtl.ca/>


Re: How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Junrui Lee
Hi Sachin,

Yes, that's correct. To resume from a savepoint, use the command bin/flink
run -s  . You can find more details in the Flink
documentation on [1].

Additionally, information on how to trigger a savepoint can be found in the
section for triggering savepoints [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints

Best,
Junrui

Sachin Mittal  于2024年5月25日周六 20:35写道:

> Hi,
> I have a long running yarn cluster and I submit my streaming job using the
> following command:
>
> flink run -m yarn-cluster -yid application_1473169569237_0001
> /usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
> --output file:///output/
>
> Let's say I want to stop this job, make updates to the jar and some new
> input arguments and restart the job from the savepoint. How would I do the
> same ?
>
> Would this be the right command ?
>
> flink run -s /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab -m
> yarn-cluster -yid application_1473169569237_0001
> /usr/lib/flink/examples/streaming/WordCount-Updated.jar --input
> file:///input1.txt --output file:///output1/ --newarg value123
>
> Thanks
> Sachin
>
>


How to start a flink job on a long running yarn cluster from a checkpoint (with arguments)

2024-05-25 Thread Sachin Mittal
Hi,
I have a long running yarn cluster and I submit my streaming job using the
following command:

flink run -m yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount.jar --input file:///input.txt
--output file:///output/

Let's say I want to stop this job, make updates to the jar and some new
input arguments and restart the job from the savepoint. How would I do the
same ?

Would this be the right command ?

flink run -s /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab -m
yarn-cluster -yid application_1473169569237_0001
/usr/lib/flink/examples/streaming/WordCount-Updated.jar --input
file:///input1.txt --output file:///output1/ --newarg value123

Thanks
Sachin


Best Practices? Fault Isolation for Processing Large Number of Same-Shaped Input Kafka Topics in a Big Flink Job

2024-05-13 Thread Kevin Lam via user
Hi everyone,

I'm currently prototyping on a project where we need to process a large
number of Kafka input topics (say, a couple of hundred), all of which share
the same DataType/Schema.

Our objective is to run the same Flink SQL on all of the input topics, but
I am concerned about doing this in a single large Flink SQL application for
fault isolation purposes. We'd like to limit the "blast radius" in cases of
data issues or "poison pills" in any particular Kafka topic — meaning, if
one topic runs into a problem, it shouldn’t compromise or halt the
processing of the others.

At the same time, we are concerned about the operational toil associated
with managing hundreds of Flink jobs that are really one logical
application.

Has anyone here tackled a similar challenge? If so:

   1. How did you design your solution to handle a vast number of topics
   without creating a heavy management burden?
   2. What strategies or patterns have you found effective in isolating
   issues within a specific topic so that they do not affect the processing of
   others?
   3. Are there specific configurations or tools within the Flink ecosystem
   that you'd recommend to efficiently manage this scenario?

Any examples, suggestions, or references to relevant documentation would be
helpful. Thank you in advance for your time and help!


Re: Flink job performance

2024-04-15 Thread Kenan Kılıçtepe
How many taskmanagers and server do you have?
Can you also share the task managers page of flink dashboard?


On Mon, Apr 15, 2024 at 10:58 AM Oscar Perez via user 
wrote:

> Hi community!
>
> We have an interesting problem with Flink after increasing parallelism in
> a certain way. Here is the summary:
>
> 1)  We identified that our job bottleneck were some Co-keyed process
> operators that were affecting on previous operators causing backpressure.
> 2( What we did was to increase the parallelism to all the operators from 6
> to 12 but keeping 6 these operators that read from kafka. The main reason
> was that all our topics have 6 partitions so increasing the parallelism
> will not yield better performance
>
> See attached job layout prior and after the changes:
> What happens was that some operations that were chained in the same
> operator like reading - filter - map - filter now are rebalanced and the
> overall performance of the job is suffering (keeps throwing exceptions now
> and then)
>
> Is the rebalance operation going over the network or this happens in the
> same node? How can we effectively improve performance of this job with the
> given resources?
>
> Thanks for the input!
> Regards,
> Oscar
>
>
>


Re: Flink job performance

2024-04-15 Thread Oscar Perez via user
Hi,
I appreciate your comments and thank you for that. My original question
still remains though. Why the very same job just by changing the settings
aforementioned had this increase in cpu usage and performance degradation
when we should have expected the opposite behaviour?

thanks again,
Oscar

On Mon, 15 Apr 2024 at 15:11, Zhanghao Chen 
wrote:

> The exception basically says the remote TM is unreachable, probably
> terminated due to some other reasons. This may not be the root cause. Is
> there any other exceptions in the log? Also, since the overall resource
> usage is almost full, could you try allocating more CPUs and see if the
> instability persists?
>
> Best,
> Zhanghao Chen
> --
> *From:* Oscar Perez 
> *Sent:* Monday, April 15, 2024 19:24
> *To:* Zhanghao Chen 
> *Cc:* Oscar Perez via user 
> *Subject:* Re: Flink job performance
>
> Hei, ok that is weird. Let me resend them.
>
> Regards,
> Oscar
>
> On Mon, 15 Apr 2024 at 14:00, Zhanghao Chen 
> wrote:
>
> Hi, there seems to be sth wrong with the two images attached in the latest
> email. I cannot open them.
>
> Best,
> Zhanghao Chen
> --
> *From:* Oscar Perez via user 
> *Sent:* Monday, April 15, 2024 15:57
> *To:* Oscar Perez via user ; pi-team <
> pi-t...@n26.com>; Hermes Team 
> *Subject:* Flink job performance
>
> Hi community!
>
> We have an interesting problem with Flink after increasing parallelism in
> a certain way. Here is the summary:
>
> 1)  We identified that our job bottleneck were some Co-keyed process
> operators that were affecting on previous operators causing backpressure.
> 2( What we did was to increase the parallelism to all the operators from 6
> to 12 but keeping 6 these operators that read from kafka. The main reason
> was that all our topics have 6 partitions so increasing the parallelism
> will not yield better performance
>
> See attached job layout prior and after the changes:
> What happens was that some operations that were chained in the same
> operator like reading - filter - map - filter now are rebalanced and the
> overall performance of the job is suffering (keeps throwing exceptions now
> and then)
>
> Is the rebalance operation going over the network or this happens in the
> same node? How can we effectively improve performance of this job with the
> given resources?
>
> Thanks for the input!
> Regards,
> Oscar
>
>
>


Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
The exception basically says the remote TM is unreachable, probably terminated 
due to some other reasons. This may not be the root cause. Is there any other 
exceptions in the log? Also, since the overall resource usage is almost full, 
could you try allocating more CPUs and see if the instability persists?

Best,
Zhanghao Chen

From: Oscar Perez 
Sent: Monday, April 15, 2024 19:24
To: Zhanghao Chen 
Cc: Oscar Perez via user 
Subject: Re: Flink job performance

Hei, ok that is weird. Let me resend them.

Regards,
Oscar

On Mon, 15 Apr 2024 at 14:00, Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi, there seems to be sth wrong with the two images attached in the latest 
email. I cannot open them.

Best,
Zhanghao Chen

From: Oscar Perez via user mailto:user@flink.apache.org>>
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user mailto:user@flink.apache.org>>; 
pi-team mailto:pi-t...@n26.com>>; Hermes Team 
mailto:hermes-t...@n26.com>>
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
Hi, there seems to be sth wrong with the two images attached in the latest 
email. I cannot open them.

Best,
Zhanghao Chen

From: Oscar Perez via user 
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user ; pi-team ; 
Hermes Team 
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
Hi Oscar,

The rebalance operation will go over the network stack, but not necessarily 
involving remote data shuffle. For data shuffling between tasks of the same 
node, the local channel is used, but compared to chained operators, it still 
introduces extra data serialization overhead. For data shuffling between tasks 
on different nodes, remote network shuffling is involved.

Therefore, breaking the chain with extra rebalance operation will definitely 
add extra overhead. But usually, it is negligible under a small parallelism 
setting like yours. Could you share the exception details thrown after the 
change?

From: Oscar Perez via user 
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user ; pi-team ; 
Hermes Team 
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: Flink job unable to restore from savepoint

2024-03-27 Thread prashant parbhane
flink version 1.17
Didn't change any job configuration.

We are facing this below issue.
https://issues.apache.org/jira/browse/FLINK-23886

On Wed, Mar 27, 2024 at 1:39 AM Hangxiang Yu  wrote:

> Hi, Prashant.
> Which Flink version did you use?
> And Did you modify your job logic or configurations ? If yes, Could you
> share changed things ?
>
> On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane 
> wrote:
>
>> Hello,
>>
>> We have been facing this weird issue of not being able to restore from
>> savepoint, when we have a significant load on flink jobs.
>>
>> "
>> *org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>> the user key.*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*
>>
>> *Caused by: java.io.EOFException*
>>
>> *at
>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*
>>
>> *at
>> org.apache.flink.types.StringValue.readString(StringValue.java:781)*
>>
>> *at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*
>>
>> *at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*
>>
>> *at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
>> "
>>
>>
>> Any suggestions?
>>
>> Thanks,
>> Prashant
>>
>>
>>
>>
>>
>
> --
> Best,
> Hangxiang.
>


Re: Flink job unable to restore from savepoint

2024-03-27 Thread Yanfei Lei
Hi Prashant,

Compared to the job that generated savepoint, are there any changes in
the new job? For example, data fields were added or deleted, or the
type serializer was changed?
More detailed job manager logs may help.

prashant parbhane  于2024年3月27日周三 14:20写道:
>
> Hello,
>
> We have been facing this weird issue of not being able to restore from 
> savepoint, when we have a significant load on flink jobs.
>
> "org.apache.flink.util.FlinkRuntimeException: Error while deserializing the 
> user key.
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)
>
> Caused by: java.io.EOFException
>
> at 
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
>
> at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)
>
> at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492"
>
>
> Any suggestions?
>
>
> Thanks,
> Prashant
>
>
>
>


-- 
Best,
Yanfei


Re: Flink job unable to restore from savepoint

2024-03-27 Thread Hangxiang Yu
Hi, Prashant.
Which Flink version did you use?
And Did you modify your job logic or configurations ? If yes, Could you
share changed things ?

On Wed, Mar 27, 2024 at 3:37 PM prashant parbhane 
wrote:

> Hello,
>
> We have been facing this weird issue of not being able to restore from
> savepoint, when we have a significant load on flink jobs.
>
> "
> *org.apache.flink.util.FlinkRuntimeException: Error while deserializing
> the user key.*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*
>
> *Caused by: java.io.EOFException*
>
> *at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*
>
> *at
> org.apache.flink.types.StringValue.readString(StringValue.java:781)*
>
> *at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*
>
> *at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*
>
> *at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
> "
>
>
> Any suggestions?
>
> Thanks,
> Prashant
>
>
>
>
>

-- 
Best,
Hangxiang.


Flink job unable to restore from savepoint

2024-03-27 Thread prashant parbhane
Hello,

We have been facing this weird issue of not being able to restore from
savepoint, when we have a significant load on flink jobs.

"
*org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
user key.*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:495)*

*Caused by: java.io.EOFException*

*at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)*

*at org.apache.flink.types.StringValue.readString(StringValue.java:781)*

*at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)*

*at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:389)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:65)*

*at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:492*
"


Any suggestions?

Thanks,
Prashant


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-03 Thread Zhanghao Chen
我在 Yanquan 的回答基础上补充下,通过 /jobs/:jobid/plan 实际上拿到的就是 JSON 表示的 JobGraph 信息(通过 
JsonPlanGenerator 这个类生成,包含了绝大部分 jobgraph 里常用的信息),应该能满足你的需要

From: casel.chen 
Sent: Saturday, March 2, 2024 14:17
To: user-zh@flink.apache.org 
Subject: 根据flink job web url可以获取到JobGraph信息么?

正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-01 Thread Yanquan Lv
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-plan
通过 /jobs/:jobid/plan  能获得 ExecutionGraph 的信息,不知道能不能包含你需要的信息。

casel.chen  于2024年3月2日周六 14:19写道:

> 正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


根据flink job web url可以获取到JobGraph信息么?

2024-03-01 Thread casel.chen
正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?

New Relic alerts for Flink job submission failure and recovery

2024-02-19 Thread elakiya udhayanan
Hi Team,

I would like to know the possibilities of configuring the new relic alerts
for a Flink job whenever the job is submitted, gets failed and recovers
from the failure.
In our case, we have configured the Flink environment as a Kubernetes pod
running on an EKS cluster and the application code is created as a jar and
submitted as a job either from the UI or using the CLI commands.

We have added the following metrics configuration in the flink-conf.yaml ,
we are unsure about the next steps, any help is appreciated. Thanks in
Advance

metrics.reporters: prom
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.host: localhost
metrics.reporter.prom.port: 9250-9260


Thanks,
Elakiya


Testing Flink job with bounded input

2024-01-18 Thread Jan Lukavský

Hi,

I have a question about how to correctly set up a test that will read 
input from locally provided collection in bounded mode and provide 
outputs at the end of the computation. My test case looks something like 
the following:


String[] lines = ...;
try (StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()) {
  env.setRuntimeMode(RuntimeExecutionMode.BATCH); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream input = 
env.fromCollection(Arrays.asList(lines)); // the following includes keyBy().window().aggregate() 
DataStream>> output = TopWords.getTopWords(input, 5); // sink that collects outputs 
to (static) list SinkFunction>> sink = TestSink.create(); output.addSink(sink); 
env.execute(); List> output =
  
TestSink.outputs(sink).stream().flatMap(Streams::stream).collect(Collectors.toList());
 // output is empty }

When I remove any window() and aggregations I can see outputs at the 
sink. I tried create manual trigger for the window, I can see 
onElement() methods called, but onEventTime() is never called. Also the 
FromElementsFunction never explicitly emits max watermark at the end 
reading the input. Seems that Flink runtime also does not emit the final 
watermark (I cannot see any traces of any watermark emission in the 
logs). Seems that I'm doing something obviously wrong, I just cannot 
figure out how to emit the final watermark. Adding autoWatermarkInterval 
did not help either.


The test seems to behave the same for Flink versions 1.16.3 and 1.18.1.

Thanks in advance for any pointers.

 Jan


RE: Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2024-01-03 Thread Schwalbe Matthias
Hi Vladimir,

I might be mistaken, here my observations:


  *   List res = 
CollectionUtil.iteratorToList(result.execute().collect()); will block until the 
job is finished
  *   However, we have a unbounded streaming job which will not finish until 
you cancel it
  *   If you just want to print results, the print sink will do
  *   You might want to directly iterate on the iterator returned by 
result.execute().collect()
  *   And make sure to close/dispose of the iterator once done

Sincere greetings
Thias

From: Alexey Novakov via user 
Sent: Tuesday, January 2, 2024 12:05 PM
To: Вова Фролов 
Cc: user@flink.apache.org
Subject: Re: Issue with Flink Job when Reading Data from Kafka and Executing 
SQL Query (q77 TPC-DS)

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Vladimir,

As I see, your SQL query is reading data from the Kafka topic and pulls all 
data to the client side. The "*.collect" method is quite network/memory 
intensive. You probably do want that.

If you want to debug/print the ingested data via SQL, I would recommend the 
"print" connector. 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
It means you could INSERT from SELECT to the print table.

Also, could it be that Flink becomes silent because it has read all data from 
the Kafka topic and just waits for any new record to be inserted to the topic?
Although I would not expect those Node - 1 disconnected messages in such a 
scenario.

Alexey

On Tue, Dec 19, 2023 at 10:40 AM Вова Фролов 
mailto:vovafrolov1...@gmail.com>> wrote:
Hello Flink Community,
I am texting to you with an issue I have encountered while using Apache Flink 
version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to ingest data 
from TPC-DS(current tpcds100 target size tpcds1), and then I am executing 
SQL queries, specifically, the q77 query, on the data in Kafka.
Versions of Components in Use:

•Apache Flink: 1.17.1

•Kafka: 3.6.0
Kafka Settings: (kafka cluster consists of 9 topics and each has: 512 
partitions, replication factor 3)

•num.network.threads=12

•num.io.threads=10

•socket.send.buffer.bytes=2097152

•socket.request.max.bytes=1073741824
Flink Job Code:
Creating tables with Kafka connector:
public static final String CREATE_STORE_SALES = "CREATE TEMPORARY TABLE 
store_sales_kafka(\n" +
"  ss_sold_date_sk INTEGER,\n" +

// here are 21 columns

"  ss_net_profit DECIMAL(7, 2)\n" +


") WITH (\n" +

"   'connector' = 'kafka',\n" +

"   'key.format' = 'avro',\n" +

"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +

"   'properties.group.id<http://properties.group.id>' = 'store_sales_group',\n" 
+

"   'scan.startup.mode' = 'earliest-offset',\n" +

"   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
xyz4:9092, xyz5:9092',\n" +

"   'topic' = 'storeSales100',\n" +

"'value.format' = 'avro',\n" +

"'value.fields-include' = 'EXCEPT_KEY'\n" +

"   );";

Q77 with Flink

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


tEnv.executeSql(Tpcds100.CREATE_STORE_SALES);

Table result = tEnv.sqlQuery(Tpcds100.Q77_WITH_KAFKA);

List res = CollectionUtil.iteratorToList(result.execute().collect());

for (Row row : res) {

System.out.println(row);

}


Flink Job Configuration:
I tried several configurations, but here are the main ones:

1. slots per TaskManager 10, parallelism 100;

2. slots per TaskManager 5, parallelism 50;

3. slots per TaskManager 15, parallelism 375;
The result is always about the same
Logs and Errors:
The logs from my Flink Job do not contain any errors.
Description of the Issue:
    The Flink Job runs smoothly for approximately 5 minutes, during which 
data processing and transformations occur as expected. However, after this 
initial period, the Flink Job seems to enter a state where no further changes 
or updates are observed in the processed data. In the logs I see a message:
 “
2023-12-18 INFO  org.apache.kafka.clients.NetworkClient   
[] - [AdminClient clientId=store_group-enumerator-admin-client] Node -1 
disconnected
“
, that is written every 5 minutes
It's worth noting that, despite the lack of errors in the logs, the 
Flink Job essentially becomes unresponsive or ceases to make progress, 
resulting in a stagnation of data processing.
This behavior is consistent across different configurations tested, 
including variations in the number of slots per TaskManager and parallelism.
While the logs do not indicate any errors, they do not provide insights 
into the reason behind the observed data processing stagnation.


Re: Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2024-01-02 Thread Alexey Novakov via user
Hi Vladimir,

As I see, your SQL query is reading data from the Kafka topic and pulls all
data to the client side. The "*.collect" method is quite network/memory
intensive. You probably do want that.

If you want to debug/print the ingested data via SQL, I would recommend the
"print" connector.
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/

It means you could INSERT from SELECT to the print table.

Also, could it be that Flink becomes silent because it has read all data
from the Kafka topic and just waits for any new record to be inserted to
the topic?
Although I would not expect those *Node - 1 disconnected* messages in such
a scenario.

Alexey

On Tue, Dec 19, 2023 at 10:40 AM Вова Фролов 
wrote:

> Hello Flink Community,
>
> I am texting to you with an issue I have encountered while using Apache
> Flink version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to
> ingest data from TPC-DS(current tpcds100 target size tpcds1), and then
> I am executing SQL queries, specifically, the q77 query, on the data in
> Kafka.
>
> *Versions of Components in Use:*
>
> ·Apache Flink: 1.17.1
>
> ·Kafka: 3.6.0
>
> Kafka Settings: (kafka cluster consists of 9 topics and each has: 512
> partitions, replication factor 3)
>
> ·num.network.threads=12
>
> ·num.io.threads=10
>
> ·socket.send.buffer.bytes=2097152
>
> ·socket.request.max.bytes=1073741824
>
> *Flink Job Code:*
>
> Creating tables with Kafka connector:
>
> public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
> store_sales_kafka(\n" +
> "  ss_sold_date_sk INTEGER,\n" +
>
> // here are 21 columns
>
> "  ss_net_profit DECIMAL(7, 2)\n" +
> ") WITH (\n" +"   'connector' = 'kafka',\n" +"   'key.format' = 'avro',\n" +" 
>   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +"   'properties.group.id' 
> = 'store_sales_group',\n" +"   'scan.startup.mode' = 'earliest-offset',\n" +" 
>   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092, 
> xyz4:9092, xyz5:9092',\n" +"   'topic' = 'storeSales100',\n" +
> "'value.format' = 'avro',\n" +
> "'value.fields-include' = 'EXCEPT_KEY'\n" +
>
> "   );";
>
>
>
> Q77 with Flink
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*getExecutionEnvironment*();StreamTableEnvironment 
> tEnv = StreamTableEnvironment.*create*(env);
>
>
>
> tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);Table result = 
> tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);List res = 
> CollectionUtil.*iteratorToList*(result.execute().collect());
> for (Row row : res) {
> System.*out*.println(row);}
>
>
>
>
>
> *Flink Job Configuration:*
>
> I tried several configurations, but here are the main ones:
>
> 1. slots per TaskManager 10, parallelism 100;
>
> 2. slots per TaskManager 5, parallelism 50;
>
> 3. slots per TaskManager 15, parallelism 375;
>
> The result is always about the same
>
> *Logs and Errors:*
>
> The logs from my Flink Job do not contain any errors.
>
> *Description of the Issue:*
>
> The Flink Job runs smoothly for approximately 5 minutes, during
> which data processing and transformations occur as expected. However, after
> this initial period, the Flink Job seems to enter a state where no further
> changes or updates are observed in the processed data. In the logs I see a
> message:
>
>  “
>
> 2023-12-18 INFO  org.apache.kafka.clients.NetworkClient
> [] - [AdminClient clientId=store_group-enumerator-admin-client] Node -
> 1 disconnected
>
> “
>
> , that is written every 5 minutes
>
> It's worth noting that, despite the lack of errors in the logs,
> the Flink Job essentially becomes unresponsive or ceases to make progress,
> resulting in a stagnation of data processing.
>
> This behavior is consistent across different configurations
> tested, including variations in the number of slots per TaskManager and
> parallelism.
>
> While the logs do not indicate any errors, they do not provide
> insights into the reason behind the observed data processing stagnation.
>
>
>
> Cluster consists of 5 machines and each has:
>
> ·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
> MHz max turbo frequency. 40 cores, 80 threads total on each machine.
>
> ·RAM 768GB, up to 640GB is available for Flink.
>
> ·2 network cards 10 Gigabit each
>
> ·10 HDD 5.5 TB
>
>
>
> This issue significantly hinders the overall effectiveness of utilizing
> Apache Flink for my data processing needs. I am seeking guidance to
> understand and resolve the underlying cause of this behavior.
>
>
>
> I am looking forward to receiving yours advises. Please let me know if you
> need additional details.
>
>
>
> Kind regards,
>
> Vladimir
>


Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2023-12-19 Thread Вова Фролов
Hello Flink Community,

I am texting to you with an issue I have encountered while using Apache
Flink version 1.17.1. In my Flink Job, I am using Kafka version 3.6.0 to
ingest data from TPC-DS(current tpcds100 target size tpcds1), and then
I am executing SQL queries, specifically, the q77 query, on the data in
Kafka.

*Versions of Components in Use:*

·Apache Flink: 1.17.1

·Kafka: 3.6.0

Kafka Settings: (kafka cluster consists of 9 topics and each has: 512
partitions, replication factor 3)

·num.network.threads=12

·num.io.threads=10

·socket.send.buffer.bytes=2097152

·socket.request.max.bytes=1073741824

*Flink Job Code:*

Creating tables with Kafka connector:

public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
"  ss_sold_date_sk INTEGER,\n" +

// here are 21 columns

"  ss_net_profit DECIMAL(7, 2)\n" +
") WITH (\n" +"   'connector' = 'kafka',\n" +"   'key.format' =
'avro',\n" +"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +"
'properties.group.id' = 'store_sales_group',\n" +"
'scan.startup.mode' = 'earliest-offset',\n" +"
'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +"   'topic' = 'storeSales100',\n" +
"'value.format' = 'avro',\n" +
"'value.fields-include' = 'EXCEPT_KEY'\n" +

"   );";



Q77 with Flink

StreamExecutionEnvironment env =
StreamExecutionEnvironment.*getExecutionEnvironment*();StreamTableEnvironment
tEnv = StreamTableEnvironment.*create*(env);



tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);Table result =
tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);List res =
CollectionUtil.*iteratorToList*(result.execute().collect());
for (Row row : res) {
System.*out*.println(row);}





*Flink Job Configuration:*

I tried several configurations, but here are the main ones:

1. slots per TaskManager 10, parallelism 100;

2. slots per TaskManager 5, parallelism 50;

3.     slots per TaskManager 15, parallelism 375;

The result is always about the same

*Logs and Errors:*

The logs from my Flink Job do not contain any errors.

*Description of the Issue:*

The Flink Job runs smoothly for approximately 5 minutes, during
which data processing and transformations occur as expected. However, after
this initial period, the Flink Job seems to enter a state where no further
changes or updates are observed in the processed data. In the logs I see a
message:

 “

2023-12-18 INFO  org.apache.kafka.clients.NetworkClient
  [] - [AdminClient clientId=store_group-enumerator-admin-client] Node -1
disconnected

“

, that is written every 5 minutes

It's worth noting that, despite the lack of errors in the logs, the
Flink Job essentially becomes unresponsive or ceases to make progress,
resulting in a stagnation of data processing.

This behavior is consistent across different configurations tested,
including variations in the number of slots per TaskManager and parallelism.

While the logs do not indicate any errors, they do not provide
insights into the reason behind the observed data processing stagnation.



Cluster consists of 5 machines and each has:

·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.

·RAM 768GB, up to 640GB is available for Flink.

·2 network cards 10 Gigabit each

·10 HDD 5.5 TB



This issue significantly hinders the overall effectiveness of utilizing
Apache Flink for my data processing needs. I am seeking guidance to
understand and resolve the underlying cause of this behavior.



I am looking forward to receiving yours advises. Please let me know if you
need additional details.



Kind regards,

Vladimir


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-07 Thread elakiya udhayanan
Hi Chen/ Feng,

Thanks for pointing out the mistake I made, after correcting the query I am
able to run the job with two sinks successfully.

Thanks,
Elakiya

On Thu, Dec 7, 2023 at 4:37 AM Chen Yu  wrote:

> Hi  Chen,
> You should tell flink which table to insert by “INSERT INTO XXX SELECT
> XXX”.
>
> For single non insert query, flink will collect output to the console
> automatically. Therefore, you don’t need to add insert also works.
>
> But you must point out target table specifically when you need to write
> data to external storage.
>
> Like,
>
> String relateQuery = "insert into xxx select correlator_id , name, 
> relationship from Correlation; ;
>
>
> Best,
> Yu Chen
>
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> --
> *发件人:* Zhanghao Chen 
> *发送时间:* Wednesday, December 6, 2023 7:21:50 PM
> *收件人:* elakiya udhayanan ; user@flink.apache.org <
> user@flink.apache.org>
> *主题:* Re: Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Elakiya,
>
> You can try executing TableEnvironmentImpl#executeInternal for non-insert
> statements, then using StatementSet.addInsertSql to add multiple insertion
> statetments, and finally calling StatementSet#execute.
>
> Best,
> Zhanghao Chen
> --
> *From:* elakiya udhayanan 
> *Sent:* Wednesday, December 6, 2023 17:49
> *To:* user@flink.apache.org 
> *Subject:* Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used: *String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Chen Yu
Hi  Chen,
You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.

For single non insert query, flink will collect output to the console 
automatically. Therefore, you don’t need to add insert also works.

But you must point out target table specifically when you need to write data to 
external storage.

Like,

String relateQuery = "insert into xxx select correlator_id , name, relationship 
from Correlation; ;


Best,
Yu Chen

获取 Outlook for iOS<https://aka.ms/o0ukef>

发件人: Zhanghao Chen 
发送时间: Wednesday, December 6, 2023 7:21:50 PM
收件人: elakiya udhayanan ; user@flink.apache.org 

主题: Re: Query on using two sinks for a Flink job (Flink SQL)

Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Feng Jin
Hi Elakiya,


You should use DML in the statement set  instead of DQL .


Here is a simple example:

executeSql("CREATE TABLE source_table1 ..");

executeSql("CREATE TABLE source_table2 ..");

executeSql("CREATE TABLE sink_table1 ..");

executeSql("CREATE TABLE sink_table1 ..");


stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.execute();


Best,
Feng


On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan 
wrote:

> Hi Xuyang, Zhangao,
>
> Thanks for your response, I have attached sample job files that I tried
> with the Statementset and with two queries. Please let me know if you are
> able to point out where I am possibly going wrong.
>
> Thanks,
> Elakiya
>
> On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:
>
>> Hi, Elakiya.
>> Are you following the example here[1]? Could you attach a minimal,
>> reproducible SQL?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>>
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>>
>> Hi Team,
>>  I would like to know the possibility of having two sinks in a
>> single Flink job. In my case I am using the Flink SQL based job where I try
>> to consume from two different Kafka topics using the create table (as
>> below) DDL and then use a join condition to correlate them and at present
>> write it to an external database (PostgreSQL - as a sink). I would like to
>> know if I can add another sink where I want to also write it to kafka topic
>> (as the second sink).
>> I tried using two sql scripts (two create and two insert for the same)
>> but was facing an exception* "Cannot have more than one execute() or
>> executeAsync() call in a single environment. at "*
>> Also tried to use the StatementSet functionality which again gave me an
>> exception *"org.apache.flink.table.api.TableException: Only insert
>> statement is supported now. at ".*
>> I am looking for some help in regards to this. TIA
>>
>> *Note:* I am using the Flink UI to submit my job.
>>
>> *Sample DDL statement used:*String statement = "CREATE TABLE Person
>> (\r\n" +
>> "  person ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>>
>> Thanks,
>> Elakiya
>>
>>


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Xuyang, Zhangao,

Thanks for your response, I have attached sample job files that I tried
with the Statementset and with two queries. Please let me know if you are
able to point out where I am possibly going wrong.

Thanks,
Elakiya

On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:

> Hi, Elakiya.
> Are you following the example here[1]? Could you attach a minimal,
> reproducible SQL?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used:*String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>
>


TwoSinks.java
Description: Binary data


TwoSinks2.java
Description: Binary data


Re:Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Xuyang
Hi, Elakiya.
Are you following the example here[1]? Could you attach a minimal, reproducible 
SQL?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/







--

Best!
Xuyang




At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA


Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya

Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Zhanghao Chen
Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Team,
 I would like to know the possibility of having two sinks in a single Flink
job. In my case I am using the Flink SQL based job where I try to consume
from two different Kafka topics using the create table (as below) DDL and
then use a join condition to correlate them and at present write it to an
external database (PostgreSQL - as a sink). I would like to know if I can
add another sink where I want to also write it to kafka topic (as the
second sink).
I tried using two sql scripts (two create and two insert for the same) but
was facing an exception* "Cannot have more than one execute() or
executeAsync() call in a single environment. at "*
Also tried to use the StatementSet functionality which again gave me an
exception *"org.apache.flink.table.api.TableException: Only insert
statement is supported now. at ".*
I am looking for some help in regards to this. TIA

*Note:* I am using the Flink UI to submit my job.

*Sample DDL statement used:*String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
+
")";

Thanks,
Elakiya


Re: Flink Job Failed With Kafka Exception

2023-11-08 Thread Feng Jin
Hi Madan,

Perhaps you can filter out inactive topics in the client first and then
pass the filtered list of topics to KafkaConsumer.

Best,
Feng


On Tue, Nov 7, 2023 at 10:42 AM Madan D via user 
wrote:

> Hello Hang/Lee,
> Thanks!
> In my usecase we listen from multiple topics but in few cases one of the
> topic may become inactive if producer decides to shutdown one of the topic
> but other topics still will be receiving data but what we observe is that
> if there’s one of the topic is getting in-active entire flink application
> is getting failed due to time out while getting metadata but we would like
> flink job to continue to consume data from other source topics even if one
> of the topic has any issue since failing entire flink application doesn’t
> make sense if one if the topic has issue.
>
>
>
> Regards,
> Madan
>
> On Nov 5, 2023, at 11:29 PM, Hang Ruan  wrote:
>
> 
> Hi, Madan.
>
> This error seems like that there are some problems when the consumer tries
> to read the topic metadata. If you use the same source for these topics,
> the kafka connector cannot skip one of them. As you say, you need to modify
> the connector's default behavior.
> Maybe you should read the code in KafkaSourceEnumerator to skip this error.
>
> Best,
> Hang
>
> Junrui Lee  于2023年11月6日周一 14:30写道:
>
>> Hi Madan,
>>
>> Do you mean you want to restart only the failed tasks, rather than
>> restarting the entire pipeline region? As far as I know, currently Flink
>> does not support task-level restart, but requires restarting the pipeline
>> region.
>>
>> Best,
>> Junrui
>>
>> Madan D via user  于2023年10月11日周三 12:37写道:
>>
>>> Hello Team,
>>> We are running the Flink pipeline by consuming data from multiple
>>> topics, but we recently encountered that if there's one topic having issues
>>> with participation, etc., the whole Flink pipeline is failing, which is
>>> affecting topics. Is there a way we can make Flink Piplein keep running
>>> even after one of the topics has an issue? We tried to handle exceptions to
>>> make sure the job wouldn't fail, but it didn't help out.
>>>
>>> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
>>>
>>>
>>> Can you please provide any insights?
>>>
>>>
>>> Regards,
>>> Madan
>>>
>>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread arjun s
Hi team,
Thank you for your response. Could you please provide a sample
regex(source.path.regex-pattern) for the following scenarios:

Matching filenames that start with "flink" Eg : flink_2023_11_08.csv
Matching filenames that end with "flink.csv" Eg:
customer_2023_11_08_flink.csv

Thanks and regards,
Arjun

On Tue, 7 Nov 2023 at 16:00, Yu Chen  wrote:

> Hi Arjun,
>
> As stated in the document, 'This regex pattern should be matched with the
> absolute file path.'
> Therefore, you should adjust your regular expression to match absolute
> paths.
>
> Please let me know if there are any other problems.
>
> Best,
> Yu Chen
>
> > 2023年11月7日 18:11,arjun s  写道:
> >
> > Hi Chen,
> > I attempted to configure the 'source.path.regex-pattern' property in the
> table settings as '^customer.*' to ensure that the Flink job only processes
> file names starting with "customer" in the specified directory. However, it
> appears that this configuration is not producing the expected results. Are
> there any additional configurations or adjustments that need to be made?
> The table script I used is as follows:
> > CREATE TABLE sample (
> >   col1 STRING,
> >   col2 STRING,
> >   col3 STRING,
> >   col4 STRING,
> >   file.path STRING NOT NULL METADATA
> > ) WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'file:///home/techuser/inputdata',
> >   'format' = 'csv',
> >   'source.path.regex-pattern' = '^customer.*',
> >   'source.monitor-interval' = '1'
> > )
> > Thanks in advance,
> > Arjun
> >
> > On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
> > Hi Arjun,
> >
> > If you can filter files by a regex pattern, I think the config
> `source.path.regex-pattern`[1] maybe what you want.
> >
> >   'source.path.regex-pattern' = '...',  -- optional: regex pattern to
> filter files to read under the
> > -- directory of `path` option.
> This regex pattern should be
> > -- matched with the absolute
> file path. If this option is set,
> > -- the connector  will recursive
> all files under the directory
> > -- of `path` option
> >
> > Best,
> > Yu Chen
> >
> >
> > [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
> >
> > 发件人: arjun s 
> > 发送时间: 2023年11月6日 20:50
> > 收件人: user@flink.apache.org 
> > 主题: Handling Schema Variability and Applying Regex Patterns in Flink Job
> Configuration   Hi team,
> > I'm currently utilizing the Table API function within my Flink job, with
> the objective of reading records from CSV files located in a source
> directory. To obtain the file names, I'm creating a table and specifying
> the schema using the Table API in Flink. Consequently, when the schema
> matches, my Flink job successfully submits and executes as intended.
> However, in cases where the schema does not match, the job fails to submit.
> Given that the schema of the files in the source directory is
> unpredictable, I'm seeking a method to handle this situation.
> > Create table query
> > =
> > CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
> 'csv','source.monitor-interval' = '1')
> > =
> >
> > Furthermore, I have a question about whether there's a way to read files
> from the source directory based on a specific regex pattern. This is
> relevant in our situation because only file names that match a particular
> pattern need to be processed by the Flink job.
> >
> > Thanks and Regards,
> > Arjun
>
>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread Yu Chen
Hi Arjun,

As stated in the document, 'This regex pattern should be matched with the 
absolute file path.'
Therefore, you should adjust your regular expression to match absolute paths.

Please let me know if there are any other problems.

Best,
Yu Chen

> 2023年11月7日 18:11,arjun s  写道:
> 
> Hi Chen,
> I attempted to configure the 'source.path.regex-pattern' property in the 
> table settings as '^customer.*' to ensure that the Flink job only processes 
> file names starting with "customer" in the specified directory. However, it 
> appears that this configuration is not producing the expected results. Are 
> there any additional configurations or adjustments that need to be made? The 
> table script I used is as follows:
> CREATE TABLE sample (
>   col1 STRING,
>   col2 STRING,
>   col3 STRING,
>   col4 STRING,
>   file.path STRING NOT NULL METADATA
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///home/techuser/inputdata',
>   'format' = 'csv',
>   'source.path.regex-pattern' = '^customer.*',
>   'source.monitor-interval' = '1'
> )
> Thanks in advance,
> Arjun
> 
> On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
> Hi Arjun,
> 
> If you can filter files by a regex pattern, I think the config 
> `source.path.regex-pattern`[1] maybe what you want.
> 
>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
> files to read under the 
> -- directory of `path` option. This 
> regex pattern should be
> -- matched with the absolute file 
> path. If this option is set,
> -- the connector  will recursive all 
> files under the directory
> -- of `path` option
> 
> Best,
> Yu Chen
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
> 
> 发件人: arjun s 
> 发送时间: 2023年11月6日 20:50
> 收件人: user@flink.apache.org 
> 主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
> Configuration   Hi team,
> I'm currently utilizing the Table API function within my Flink job, with the 
> objective of reading records from CSV files located in a source directory. To 
> obtain the file names, I'm creating a table and specifying the schema using 
> the Table API in Flink. Consequently, when the schema matches, my Flink job 
> successfully submits and executes as intended. However, in cases where the 
> schema does not match, the job fails to submit. Given that the schema of the 
> files in the source directory is unpredictable, I'm seeking a method to 
> handle this situation.
> Create table query
> =
> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' = 
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' = 
> 'csv','source.monitor-interval' = '1')
> =
> 
> Furthermore, I have a question about whether there's a way to read files from 
> the source directory based on a specific regex pattern. This is relevant in 
> our situation because only file names that match a particular pattern need to 
> be processed by the Flink job.
> 
> Thanks and Regards,
> Arjun



Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread arjun s
Hi Chen,
I attempted to configure the 'source.path.regex-pattern' property in the
table settings as '^customer.*' to ensure that the Flink job only processes
file names starting with "customer" in the specified directory. However, it
appears that this configuration is not producing the expected results. Are
there any additional configurations or adjustments that need to be made?
The table script I used is as follows:
CREATE TABLE sample (
  col1 STRING,
  col2 STRING,
  col3 STRING,
  col4 STRING,
  file.path STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///home/techuser/inputdata',
  'format' = 'csv',
  'source.path.regex-pattern' = '^customer.*',
  'source.monitor-interval' = '1'
)
Thanks in advance,
Arjun

On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:

> Hi Arjun,
>
> If you can filter files by a regex pattern, I think the config
> `source.path.regex-pattern`[1] maybe what you want.
>
>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
> files to read under the -- directory 
> of `path` option. This regex pattern should be
> -- matched with the absolute file path. If this option is set,
> -- the connector  will recursive all files 
> under the directory-- of `path` option
>
>
> Best,
> Yu Chen
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>
> --
> *发件人:* arjun s 
> *发送时间:* 2023年11月6日 20:50
> *收件人:* user@flink.apache.org 
> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
> Job Configuration
>
> Hi team,
> I'm currently utilizing the Table API function within my Flink job, with
> the objective of reading records from CSV files located in a source
> directory. To obtain the file names, I'm creating a table and specifying
> the schema using the Table API in Flink. Consequently, when the schema
> matches, my Flink job successfully submits and executes as intended.
> However, in cases where the schema does not match, the job fails to submit.
> Given that the schema of the files in the source directory is
> unpredictable, I'm seeking a method to handle this situation.
> Create table query
> =
> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
> 'csv','source.monitor-interval' = '1')
> =
>
> Furthermore, I have a question about whether there's a way to read files
> from the source directory based on a specific regex pattern. This is
> relevant in our situation because only file names that match a particular
> pattern need to be processed by the Flink job.
>
> Thanks and Regards,
> Arjun
>


Re: Flink Job Failed With Kafka Exception

2023-11-06 Thread Madan D via user
Hello Hang/Lee,Thanks!In my usecase we listen from multiple topics but in few cases one of the topic may become inactive if producer decides to shutdown one of the topic but other topics still will be receiving data but what we observe is that if there’s one of the topic is getting in-active entire flink application is getting failed due to time out while getting metadata but we would like flink job to continue to consume data from other source topics even if one of the topic has any issue since failing entire flink application doesn’t make sense if one if the topic has issue.Regards,Madan On Nov 5, 2023, at 11:29 PM, Hang Ruan  wrote:Hi, Madan.This error seems like that there are some problems when the consumer tries to read the topic metadata. If you use the same source for these topics, the kafka connector cannot skip one of them. As you say, you need to modify the connector's default behavior.Maybe you should read the code in KafkaSourceEnumerator to skip this error.Best,HangJunrui Lee <jrlee@gmail.com> 于2023年11月6日周一 14:30写道:Hi Madan,Do you mean you want to restart only the failed tasks, rather than restarting the entire pipeline region? As far as I know, currently Flink does not support task-level restart, but requires restarting the pipeline region.Best,JunruiMadan D via user <user@flink.apache.org> 于2023年10月11日周三 12:37写道:Hello Team, We are running the Flink pipeline by consuming data from multiple topics, but we recently encountered that if there's one topic having issues with participation, etc., the whole Flink pipeline is failing, which is affecting topics. Is there a way we can make Flink Piplein keep running even after one of the topics has an issue? We tried to handle exceptions to make sure the job wouldn't fail, but it didn't help out.Caused by: java.lang.RuntimeException: Failed to get metadata for topics 
 
Can you please provide any insights?Regards,Madan



Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Andrew Otto
> unpredictable file schema(Table API)  in the source directory

You'll probably have to write some logic that helps predict the schema :)

Are there actual schemas for the CSV files somewhere?  JSONSchema or
something of the like?At Wikimedia we use JSONSchema (not with CSV
data, but it could work), and have code that can convert from JSONSchema
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java#22>
to Flink Schemas, either TypeInformation or Table API DataType
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>

Here's an example
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java#29>
in code docs for use with Kafka.  You could use this to build read CSV
files instead?  Something like:

TableDescriptor.forConnector("filesystem")
.schema(JsonSchemaFlinkConverter.toSchemaBuilder(jsonSchema).build())
...

If you are doing pure SQL (not table api), you'll need to have something
that translates from your schema to SQL...or start implementing a custom
Catalog
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#user-defined-catalog>,
which uh, we kind of did
<https://wikitech.wikimedia.org/wiki/Event_Platform/Stream_Processing/Flink_Catalog>,
but it was not easy.









On Mon, Nov 6, 2023 at 1:30 PM arjun s  wrote:

> Thanks for your response.
> How should we address the issue of dealing with the unpredictable file
> schema(Table API)  in the source directory, as I previously mentioned in my
> email?
>
> Thanks and regards,
> Arjun
>
> On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
>
>> Hi Arjun,
>>
>> If you can filter files by a regex pattern, I think the config
>> `source.path.regex-pattern`[1] maybe what you want.
>>
>>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
>> files to read under the -- directory 
>> of `path` option. This regex pattern should be   
>>  -- matched with the absolute file path. If this option is set,  
>>   -- the connector  will recursive all files 
>> under the directory-- of `path` 
>> option
>>
>>
>> Best,
>> Yu Chen
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>>
>> --
>> *发件人:* arjun s 
>> *发送时间:* 2023年11月6日 20:50
>> *收件人:* user@flink.apache.org 
>> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
>> Job Configuration
>>
>> Hi team,
>> I'm currently utilizing the Table API function within my Flink job, with
>> the objective of reading records from CSV files located in a source
>> directory. To obtain the file names, I'm creating a table and specifying
>> the schema using the Table API in Flink. Consequently, when the schema
>> matches, my Flink job successfully submits and executes as intended.
>> However, in cases where the schema does not match, the job fails to submit.
>> Given that the schema of the files in the source directory is
>> unpredictable, I'm seeking a method to handle this situation.
>> Create table query
>> =
>> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
>> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
>> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
>> 'csv','source.monitor-interval' = '1')
>> =
>>
>> Furthermore, I have a question about whether there's a way to read files
>> from the source directory based on a specific regex pattern. This is
>> relevant in our situation because only file names that match a particular
>> pattern need to be processed by the Flink job.
>>
>> Thanks and Regards,
>> Arjun
>>
>


Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread arjun s
Thanks for your response.
How should we address the issue of dealing with the unpredictable file
schema(Table API)  in the source directory, as I previously mentioned in my
email?

Thanks and regards,
Arjun

On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:

> Hi Arjun,
>
> If you can filter files by a regex pattern, I think the config
> `source.path.regex-pattern`[1] maybe what you want.
>
>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
> files to read under the -- directory 
> of `path` option. This regex pattern should be
> -- matched with the absolute file path. If this option is set,
> -- the connector  will recursive all files 
> under the directory-- of `path` option
>
>
> Best,
> Yu Chen
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
>
> --
> *发件人:* arjun s 
> *发送时间:* 2023年11月6日 20:50
> *收件人:* user@flink.apache.org 
> *主题:* Handling Schema Variability and Applying Regex Patterns in Flink
> Job Configuration
>
> Hi team,
> I'm currently utilizing the Table API function within my Flink job, with
> the objective of reading records from CSV files located in a source
> directory. To obtain the file names, I'm creating a table and specifying
> the schema using the Table API in Flink. Consequently, when the schema
> matches, my Flink job successfully submits and executes as intended.
> However, in cases where the schema does not match, the job fails to submit.
> Given that the schema of the files in the source directory is
> unpredictable, I'm seeking a method to handle this situation.
> Create table query
> =
> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' =
> 'csv','source.monitor-interval' = '1')
> =
>
> Furthermore, I have a question about whether there's a way to read files
> from the source directory based on a specific regex pattern. This is
> relevant in our situation because only file names that match a particular
> pattern need to be processed by the Flink job.
>
> Thanks and Regards,
> Arjun
>


回复: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Chen Yu
Hi Arjun,

If you can filter files by a regex pattern, I think the config 
`source.path.regex-pattern`[1] maybe what you want.


  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files to read under the
-- directory of `path` option. This 
regex pattern should be
-- matched with the absolute file path. 
If this option is set,
-- the connector  will recursive all 
files under the directory
-- of `path` option

Best,
Yu Chen


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/


发件人: arjun s 
发送时间: 2023年11月6日 20:50
收件人: user@flink.apache.org 
主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
Configuration

Hi team,
I'm currently utilizing the Table API function within my Flink job, with the 
objective of reading records from CSV files located in a source directory. To 
obtain the file names, I'm creating a table and specifying the schema using the 
Table API in Flink. Consequently, when the schema matches, my Flink job 
successfully submits and executes as intended. However, in cases where the 
schema does not match, the job fails to submit. Given that the schema of the 
files in the source directory is unpredictable, I'm seeking a method to handle 
this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 STRING,file.path` 
STRING NOT NULL METADATA) WITH ('connector' = 'filesystem','path' = 
'file:///home/techuser/inputdata','format' = 'csv','source.monitor-interval' = 
'1')
=

Furthermore, I have a question about whether there's a way to read files from 
the source directory based on a specific regex pattern. This is relevant in our 
situation because only file names that match a particular pattern need to be 
processed by the Flink job.

Thanks and Regards,
Arjun


Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread arjun s
Hi team,
I'm currently utilizing the Table API function within my Flink job, with
the objective of reading records from CSV files located in a source
directory. To obtain the file names, I'm creating a table and specifying
the schema using the Table API in Flink. Consequently, when the schema
matches, my Flink job successfully submits and executes as intended.
However, in cases where the schema does not match, the job fails to submit.
Given that the schema of the files in the source directory is
unpredictable, I'm seeking a method to handle this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4
STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' =
'filesystem','path' = 'file:///home/techuser/inputdata','format' =
'csv','source.monitor-interval' = '1')
=

Furthermore, I have a question about whether there's a way to read files
from the source directory based on a specific regex pattern. This is
relevant in our situation because only file names that match a particular
pattern need to be processed by the Flink job.

Thanks and Regards,
Arjun


Re: Flink Job Failed With Kafka Exception

2023-11-05 Thread Hang Ruan
Hi, Madan.

This error seems like that there are some problems when the consumer tries
to read the topic metadata. If you use the same source for these topics,
the kafka connector cannot skip one of them. As you say, you need to modify
the connector's default behavior.
Maybe you should read the code in KafkaSourceEnumerator to skip this error.

Best,
Hang

Junrui Lee  于2023年11月6日周一 14:30写道:

> Hi Madan,
>
> Do you mean you want to restart only the failed tasks, rather than
> restarting the entire pipeline region? As far as I know, currently Flink
> does not support task-level restart, but requires restarting the pipeline
> region.
>
> Best,
> Junrui
>
> Madan D via user  于2023年10月11日周三 12:37写道:
>
>> Hello Team,
>> We are running the Flink pipeline by consuming data from multiple topics,
>> but we recently encountered that if there's one topic having issues with
>> participation, etc., the whole Flink pipeline is failing, which is
>> affecting topics. Is there a way we can make Flink Piplein keep running
>> even after one of the topics has an issue? We tried to handle exceptions to
>> make sure the job wouldn't fail, but it didn't help out.
>>
>> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
>>
>>
>> Can you please provide any insights?
>>
>>
>> Regards,
>> Madan
>>
>


Re: Flink Job Failed With Kafka Exception

2023-11-05 Thread Junrui Lee
Hi Madan,

Do you mean you want to restart only the failed tasks, rather than
restarting the entire pipeline region? As far as I know, currently Flink
does not support task-level restart, but requires restarting the pipeline
region.

Best,
Junrui

Madan D via user  于2023年10月11日周三 12:37写道:

> Hello Team,
> We are running the Flink pipeline by consuming data from multiple topics,
> but we recently encountered that if there's one topic having issues with
> participation, etc., the whole Flink pipeline is failing, which is
> affecting topics. Is there a way we can make Flink Piplein keep running
> even after one of the topics has an issue? We tried to handle exceptions to
> make sure the job wouldn't fail, but it didn't help out.
>
> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
>
>
> Can you please provide any insights?
>
>
> Regards,
> Madan
>


Flink Job Failed With Kafka Exception

2023-10-10 Thread Madan D via user
Hello Team, We are running the Flink pipeline by consuming data from multiple 
topics, but we recently encountered that if there's one topic having issues 
with participation, etc., the whole Flink pipeline is failing, which is 
affecting topics. Is there a way we can make Flink Piplein keep running even 
after one of the topics has an issue? We tried to handle exceptions to make 
sure the job wouldn't fail, but it didn't help out.
Caused by: java.lang.RuntimeException: Failed to get metadata for topics  Can 
you please provide any insights?

Regards,Madan

Flink job reading from s3 path

2023-09-07 Thread Hou, Lijuan via user
Hi team,

I want to implement a flink job to read avro files from s3 path, and output to 
a kafka topic.

Currently, I am using AvroInputFormat like this:

AvroInputFormat avroInputFormat =
new AvroInputFormat<>(new Path(S3PathString), Session.class);
TypeInformation typeInfo = TypeInformation.of(Session.class);
DataStream datastream = env.createInput(avroInputFormat, typeInfo);

But encountered with AvroTypeException in the job.
Does it mean AvroInputFormat is not a good way to read from s3 path?
What about FileSource? Is it better than AvroInputFormat?

Can I get some suggestions here? Thanks!

Best,
Lijuan

Attaching the exception in the job:
org.apache.avro.AvroTypeException: Found KafkaRecord, expecting ……Session, 
missing required field channel
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) 
~[?:?] at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) ~[?:?] at 
org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) 
~[?:?] at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:240)
 ~[?:?] at 
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
 ~[?:?] at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[?:?] at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[?:?] at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[?:?] at org.apache.avro.file.DataFileStream.next(DataFileStream.java:263) 
~[?:?] at 
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
 ~[?:?] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:390)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
~[flink-dist-1.17.1.jar:1.17.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
~[flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Unknown Source) ~[?:?]






回复: flink-job-history 任务太多页面卡死

2023-07-28 Thread 阿华田
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2023年07月28日 11:28,Shammon FY 写道:
Hi,

可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options

Best,
Shammon FY

On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:

目前flink-job-history
已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




Re: flink-job-history 任务太多页面卡死

2023-07-27 Thread Weihua Hu
Hi

Flink UI 需要加载所有的 Job 信息并在 UI 渲染,在作业比较多的时候很容易导致 UI 卡死。
不只在这个页面,在一些并发比较大的任务上打开 subtask 页面也很容易导致UI 卡死。

Flink UI 需要一个分页的功能来减少数据加载和 UI 渲染的压力

Best,
Weihua


On Fri, Jul 28, 2023 at 11:29 AM Shammon FY  wrote:

> Hi,
>
>
> 可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options
>
> Best,
> Shammon FY
>
> On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:
>
> > 目前flink-job-history
> > 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
> > | |
> > 阿华田
> > |
> > |
> > a15733178...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
>


Re: flink-job-history 任务太多页面卡死

2023-07-27 Thread Shammon FY
Hi,

可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options

Best,
Shammon FY

On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:

> 目前flink-job-history
> 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


flink-job-history 任务太多页面卡死

2023-07-27 Thread 阿华田
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Flink job submission to Multi-VM Flink Cluster fails!

2023-06-02 Thread Z M Ang
Hello,


I can launch a Flink cluster (version 1.17.x) on my laptop with 1 Job
Manager and 3 Task Managers. The cluster starts, jobs can be submitted
correctly on the localhost (my laptop).

Next I tried to  launch this cluster on 4 VMs - 1 Master VM (for the Job
Manager) and 3 Worker VMs (for Task Managers). I am not using YARN, K8s or
Docker on any of these VMs.

The cluster starts up fine using "${FLINK_HOME}"/bin/start-cluster.sh.
i.e., running this on the command line on the Master VM does the expected -
it starts the job manager on the Master and then starts 1 Task Manager on
each Worker VM. (ssh connectivity between Master and Worker VMs is fine).

However, job submission("${FLINK_HOME}"/bin/flink run myapp.jar) fails with
the following exceptions:

> Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
> at 
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
> at 
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)
> ... 10 more
> Caused by: java.io.IOException: Could not connect to BlobServer at address 
> localhost/127.0.0.1:37452
> at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:103)
> at 
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$null$3(JobSubmitHandler.java:199)
> at 
> org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:82)
> ... 11 more
> Caused by: java.net.ConnectException: Connection refused (Connection refused)
> at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
> at 
> java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
> at 
> java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
> at 
> java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
> at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.base/java.net.Socket.connect(Socket.java:615)
> at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:97)
> ... 13 more
> ]
> at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
> ... 4 more
>
>

Looking at the DEBUG level logs on the Master (Job Manager) and the Workers
(Task Managers) ... it looks like the Task Managers on the Worker VMS are
all registered with the Job Manager on the Master VM (The Job Manager logs
shows the Task Managers). Hence, connectivity and registration from Worker
VMs back to the Master VM (and the job manager) is fine.

The Web UI reflects this too. I can see the Job Manager and 3 Task managers
running on those 4 VMs.

Hence, it is ONLY the job submission that's failing with this "Could not
connect to Blob Server" exception. I suspect I have the configuration
incorrect.

Questions:

   1. Is there a reference implementation of a Multi-VM Flink Cluster (NOT
   on Docker)?
   2. How should taskmanager.*.*.* properties in flink-conf.yaml be
   configured for a multi-VM cluster? In particular, what should the values of
   "taskmanager.bind-host" and "task-manager.host" be on the Master VM?
   3. Does a multi-VM flink cluster need a shared storage directory? If so,
   is there any documentation on configuring this?
   4. From reading various Flink enhancement requests and mailing list
   posts, I found that the Blob Server needs to be accessible from the Task
   Managers on an address that's external facing. Note from the exception
   trace above that the attempt to connect to the BlobServer is being made on
   "localhost/127.0.0.1:37452". I cannot find any configuration item that
   allows me to set the host or bind-host for the blobserver. How is this to
   be done?


Please advise. Thanks in advance for your responses.


Re: Flink Job Failure for version 1.16

2023-05-14 Thread Hangxiang Yu
Hi,
I may have missed something, So could you share more:

 I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
> performance degradation...


Are you referring to a decrease in Checkpoint Performance when you mention
performance decline?
It just happens when you upgrade from 1.13.6 to 1.16.1 without any
modifications in configuration and job ?
So Could you share configuration before and after upgrading ?

Is there any issue with this Flink version or the new RocksDB version? What
> should be the action item for this Exception?
> The maximum savepoint size is 80.2 GB and we periodically(every 20
> minutes) take the savepoint for the job.
>

The version of RocksDB has been upgraded in 1.14, but it should not
increase the checkpoint size in theory.
So you found the checkpoint size has increased after upgrading ? Could you
also share some checkpoint metrics / configuration before and after
upgrading ?

On Fri, May 12, 2023 at 9:06 PM neha goyal  wrote:

> Hi Everyone, can someone please shade some light when the Checkpoint
> Coordinator is suspending Error comes and what should I do to avoid this?
> it is impacting the production pipeline after the version upgrade. It is
> related to resource crunch in the pipeline?
> Thank You
>
> On Thu, May 11, 2023 at 10:35 AM neha goyal  wrote:
>
>> I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
>> performance degradation for the Flink pipeline which is using Flink's
>> managed state ListState, MapState, etc. Pipelines are frequently failing
>> with the Exception:
>>
>> 06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager
>> - Failed to trigger or complete checkpoint 36755 for job
>> d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
>>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
>> Checkpoint expired before completing.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:750)
>> 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
>>  a.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
>> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
>> akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
>>  a.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
>> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
>> akka.remote.ReliableDeliverySupervisor07:18:15.331
>> [flink-akka.actor.default-dispatcher-31] WARN
>>  o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
>> checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
>> failed attempts so far)
>>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
>> Checkpoint Coordinator is suspending.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)
>>
>> Is there any issue with this Flink version or the new RocksDB version?
>> What should be the action item for this Exception?
>> The maximum savepoint size is 80.2 GB and we periodically(every 20
>> minutes) take the savepoint for the job.
>> Checkpoint Type: aligned checkpoint
>>
>

-- 
Best,
Hangxiang.


Re: Flink Job Failure for version 1.16

2023-05-12 Thread neha goyal
Hi Everyone, can someone please shade some light when the Checkpoint
Coordinator is suspending Error comes and what should I do to avoid this?
it is impacting the production pipeline after the version upgrade. It is
related to resource crunch in the pipeline?
Thank You

On Thu, May 11, 2023 at 10:35 AM neha goyal  wrote:

> I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
> performance degradation for the Flink pipeline which is using Flink's
> managed state ListState, MapState, etc. Pipelines are frequently failing
> with the Exception:
>
> 06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager -
> Failed to trigger or complete checkpoint 36755 for job
> d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
>  a.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
>  a.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> akka.remote.ReliableDeliverySupervisor07:18:15.331
> [flink-akka.actor.default-dispatcher-31] WARN
>  o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
> checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
> failed attempts so far)
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint Coordinator is suspending.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)
>
> Is there any issue with this Flink version or the new RocksDB version?
> What should be the action item for this Exception?
> The maximum savepoint size is 80.2 GB and we periodically(every 20
> minutes) take the savepoint for the job.
> Checkpoint Type: aligned checkpoint
>


Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-11 Thread Weihua Hu
Hi,

checkpoints are only used in failover for one job. Once a job is cancelled,
the related checkpoint-count metadata (stored on HA) will be removed.
But the checkpoint data could be retained if you configured it.

IIUC, the redeploy/update job will cancel the old job and then start a new
one.
They are two jobs for Flink, even if the jobid is the same. In this case,
the checkpoints
are retained, but ha data are removed. So a new job with the same jobID
will throw
an exception.

If you do not need to restore from the old checkpoint when re-deploy job,
you can disable retain checkpoints [1].
If you need to restore from the old checkpoint, you need to know the latest
checkpoint path and specify it in the start command[2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint


Best,
Weihua


On Thu, May 11, 2023 at 1:43 PM amenreet sodhi  wrote:

> Hey Hang,
>
> I am deploying my Flink Job in HA application mode, Whenever I redeploy my
> job, or deploy an updated version of the job, it's using the same job_id. I
> haven't configured anywhere to use a fixed job id, I think it's doing it by
> default. Can you share where I can configure this? I tried it once before,
> but couldn't find anything.
>
> Thanks
> Regards
> Amenreet Singh Sodhi
>
> On Wed, May 10, 2023 at 8:36 AM Hang Ruan  wrote:
>
>> Hi, amenreet,
>>
>> As Hangxiang said, we should use a new checkpoint dir if the new job has
>> the same jobId as the old one . Or else you should not use a fixed jobId
>> and the checkpoint dir will not conflict.
>>
>> Best,
>> Hang
>>
>> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>>
>>> Hi,
>>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>>> as before ?
>>> And you may also start the job without before state ?
>>> The new job cannot know anything about before checkpoints, that's why
>>> the new job will fail when it tries to generate a new checkpoint.
>>> I'd like to suggest you to use different JOB_ID for different jobs, or
>>> set a different checkpoint dir for a new job.
>>>
>>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Is there any way to prevent restart of flink job, or override the
>>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>>> name. I get the following exception and my job restarts, have been trying
>>>> to find solution for a very long time but havent found anything useful yet,
>>>> other than manually cleaning.
>>>>
>>>> 2023-02-27 10:00:50,360 WARN  
>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>>> [] - Failed to trigger or complete checkpoint 1 for job
>>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>>> finalize checkpoint.
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> [?:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> [?:?]
>>>>
>>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>>
>>>> Caused by: java.io.IOException: Target file
>>>

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-10 Thread amenreet sodhi
Hey Hang,

I am deploying my Flink Job in HA application mode, Whenever I redeploy my
job, or deploy an updated version of the job, it's using the same job_id. I
haven't configured anywhere to use a fixed job id, I think it's doing it by
default. Can you share where I can configure this? I tried it once before,
but couldn't find anything.

Thanks
Regards
Amenreet Singh Sodhi

On Wed, May 10, 2023 at 8:36 AM Hang Ruan  wrote:

> Hi, amenreet,
>
> As Hangxiang said, we should use a new checkpoint dir if the new job has
> the same jobId as the old one . Or else you should not use a fixed jobId
> and the checkpoint dir will not conflict.
>
> Best,
> Hang
>
> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>
>> Hi,
>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>> as before ?
>> And you may also start the job without before state ?
>> The new job cannot know anything about before checkpoints, that's why the
>> new job will fail when it tries to generate a new checkpoint.
>> I'd like to suggest you to use different JOB_ID for different jobs, or
>> set a different checkpoint dir for a new job.
>>
>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>> wrote:
>>
>>> Hi all,
>>>
>>> Is there any way to prevent restart of flink job, or override the
>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>> name. I get the following exception and my job restarts, have been trying
>>> to find solution for a very long time but havent found anything useful yet,
>>> other than manually cleaning.
>>>
>>> 2023-02-27 10:00:50,360 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>> [] - Failed to trigger or complete checkpoint 1 for job
>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>> finalize checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> [?:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> [?:?]
>>>
>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>
>>> Caused by: java.io.IOException: Target file
>>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>>> already exists.
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> ... 7 more
>>>
>>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobM

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-10 Thread amenreet sodhi
Hi Weihua,

I am deploying my flink job in HA application mode on a kubernetes cluster.
I am using an external nfs mount for storing checkpoints. For some reason,
whenever I deploy an updated version of my application, it uses the same
job_id for the new job as for the previous job. Thus the flink job creates
checkpoints in the same directory, and at whatever point it encounters the
same checkpoint path(already existing checkpoint from previous versions of
my job) it throws the above error, and the job restarts. I have set my job
restart count as 3. So if this happens continuously for 3 times, the
jobmanager pod restarts, and then it starts the job again from checkpoint-0
or from the last saved savepoint. Then the same story repeats.

Thanks
Regards
Amenreet Singh Sodhi

On Wed, May 10, 2023 at 9:10 AM Weihua Hu  wrote:

> Hi,
>
> if for some reason there exists a checkpoint by same name.
>>
> Could you give more details about your scenarios here?
> From your description, I guess this problem occurred when a job restart,
> does this restart is triggered personally?
>
> In common restart processing, the job will retrieve the latest checkpoint
> from a high-available service(zookeeper or kubernetes),
> and then restore from it and make a new checkpoint with a new
> checkpoint-id.
> In this case, the job does not recover from the old checkpoint, but the
> old checkpoint path already exists.
>
> Best,
> Weihua
>
>
> On Wed, May 10, 2023 at 11:07 AM Hang Ruan  wrote:
>
>> Hi, amenreet,
>>
>> As Hangxiang said, we should use a new checkpoint dir if the new job has
>> the same jobId as the old one . Or else you should not use a fixed jobId
>> and the checkpoint dir will not conflict.
>>
>> Best,
>> Hang
>>
>> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>>
>>> Hi,
>>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>>> as before ?
>>> And you may also start the job without before state ?
>>> The new job cannot know anything about before checkpoints, that's why
>>> the new job will fail when it tries to generate a new checkpoint.
>>> I'd like to suggest you to use different JOB_ID for different jobs, or
>>> set a different checkpoint dir for a new job.
>>>
>>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Is there any way to prevent restart of flink job, or override the
>>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>>> name. I get the following exception and my job restarts, have been trying
>>>> to find solution for a very long time but havent found anything useful yet,
>>>> other than manually cleaning.
>>>>
>>>> 2023-02-27 10:00:50,360 WARN  
>>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>>> [] - Failed to trigger or complete checkpoint 1 for job
>>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>>
>>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>>> finalize checkpoint.
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> [?:?]
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> [?:?]
>>>>
>>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>>
>>>> Caused by

Flink Job Failure for version 1.16

2023-05-10 Thread neha goyal
I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
performance degradation for the Flink pipeline which is using Flink's
managed state ListState, MapState, etc. Pipelines are frequently failing
with the Exception:

06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager -
Failed to trigger or complete checkpoint 36755 for job
d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint expired before completing.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.331
[flink-akka.actor.default-dispatcher-31] WARN
 o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint Coordinator is suspending.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)

Is there any issue with this Flink version or the new RocksDB version? What
should be the action item for this Exception?
The maximum savepoint size is 80.2 GB and we periodically(every 20 minutes)
take the savepoint for the job.
Checkpoint Type: aligned checkpoint


Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Weihua Hu
Hi,

if for some reason there exists a checkpoint by same name.
>
Could you give more details about your scenarios here?
>From your description, I guess this problem occurred when a job restart,
does this restart is triggered personally?

In common restart processing, the job will retrieve the latest checkpoint
from a high-available service(zookeeper or kubernetes),
and then restore from it and make a new checkpoint with a new checkpoint-id.
In this case, the job does not recover from the old checkpoint, but the old
checkpoint path already exists.

Best,
Weihua


On Wed, May 10, 2023 at 11:07 AM Hang Ruan  wrote:

> Hi, amenreet,
>
> As Hangxiang said, we should use a new checkpoint dir if the new job has
> the same jobId as the old one . Or else you should not use a fixed jobId
> and the checkpoint dir will not conflict.
>
> Best,
> Hang
>
> Hangxiang Yu  于2023年5月10日周三 10:35写道:
>
>> Hi,
>> I guess you used a fixed JOB_ID, and configured the same checkpoint dir
>> as before ?
>> And you may also start the job without before state ?
>> The new job cannot know anything about before checkpoints, that's why the
>> new job will fail when it tries to generate a new checkpoint.
>> I'd like to suggest you to use different JOB_ID for different jobs, or
>> set a different checkpoint dir for a new job.
>>
>> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi 
>> wrote:
>>
>>> Hi all,
>>>
>>> Is there any way to prevent restart of flink job, or override the
>>> checkpoint metadata, if for some reason there exists a checkpoint by same
>>> name. I get the following exception and my job restarts, have been trying
>>> to find solution for a very long time but havent found anything useful yet,
>>> other than manually cleaning.
>>>
>>> 2023-02-27 10:00:50,360 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>>> [] - Failed to trigger or complete checkpoint 1 for job
>>> 6e6b1332. (0 consecutive failed attempts so far)
>>>
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>>> finalize checkpoint.
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> [?:?]
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> [?:?]
>>>
>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>>
>>> Caused by: java.io.IOException: Target file
>>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>>> already exists.
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>>
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hang Ruan
Hi, amenreet,

As Hangxiang said, we should use a new checkpoint dir if the new job has
the same jobId as the old one . Or else you should not use a fixed jobId
and the checkpoint dir will not conflict.

Best,
Hang

Hangxiang Yu  于2023年5月10日周三 10:35写道:

> Hi,
> I guess you used a fixed JOB_ID, and configured the same checkpoint dir as
> before ?
> And you may also start the job without before state ?
> The new job cannot know anything about before checkpoints, that's why the
> new job will fail when it tries to generate a new checkpoint.
> I'd like to suggest you to use different JOB_ID for different jobs, or set
> a different checkpoint dir for a new job.
>
> On Tue, May 9, 2023 at 9:38 PM amenreet sodhi  wrote:
>
>> Hi all,
>>
>> Is there any way to prevent restart of flink job, or override the
>> checkpoint metadata, if for some reason there exists a checkpoint by same
>> name. I get the following exception and my job restarts, have been trying
>> to find solution for a very long time but havent found anything useful yet,
>> other than manually cleaning.
>>
>> 2023-02-27 10:00:50,360 WARN  
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> [] - Failed to trigger or complete checkpoint 1 for job
>> 6e6b1332. (0 consecutive failed attempts so far)
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
>> finalize checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>>
>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>
>> Caused by: java.io.IOException: Target file
>> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
>> already exists.
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> ... 7 more
>>
>> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Error while processing AcknowledgeCheckpoint message
>>
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> finalize the pending checkpoint 1. Failure reason: Failure to finalize
>> checkpoint.
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>> ~[event_executor-1.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.flink.runt

Re: Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread Hangxiang Yu
Hi,
I guess you used a fixed JOB_ID, and configured the same checkpoint dir as
before ?
And you may also start the job without before state ?
The new job cannot know anything about before checkpoints, that's why the
new job will fail when it tries to generate a new checkpoint.
I'd like to suggest you to use different JOB_ID for different jobs, or set
a different checkpoint dir for a new job.

On Tue, May 9, 2023 at 9:38 PM amenreet sodhi  wrote:

> Hi all,
>
> Is there any way to prevent restart of flink job, or override the
> checkpoint metadata, if for some reason there exists a checkpoint by same
> name. I get the following exception and my job restarts, have been trying
> to find solution for a very long time but havent found anything useful yet,
> other than manually cleaning.
>
> 2023-02-27 10:00:50,360 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> [] - Failed to trigger or complete checkpoint 1 for job
> 6e6b1332. (0 consecutive failed attempts so far)
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
> finalize checkpoint.
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:834) [?:?]
>
> Caused by: java.io.IOException: Target file
> file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
> already exists.
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> ... 7 more
>
> 2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
> [] - Error while processing AcknowledgeCheckpoint message
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 1. Failure reason: Failure to finalize
> checkpoint.
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> ~[event_executor-1.0-SNAPSHOT.jar:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:834) [?:?]
>
> Caused by: java.io.IOException: Target file
> file:/op

Flink Job Restarts if the metadata already exists for some Checkpoint

2023-05-09 Thread amenreet sodhi
Hi all,

Is there any way to prevent restart of flink job, or override the
checkpoint metadata, if for some reason there exists a checkpoint by same
name. I get the following exception and my job restarts, have been trying
to find solution for a very long time but havent found anything useful yet,
other than manually cleaning.

2023-02-27 10:00:50,360 WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager
[] - Failed to trigger or complete checkpoint 1 for job
6e6b1332. (0 consecutive failed attempts so far)

org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
finalize checkpoint.

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]

at java.lang.Thread.run(Thread.java:834) [?:?]

Caused by: java.io.IOException: Target file
file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
already exists.

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:64)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:332)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
~[event_executor-1.0-SNAPSHOT.jar:?]

... 7 more

2023-02-27 10:00:50,374 WARN  org.apache.flink.runtime.jobmaster.JobMaster
  [] - Error while processing AcknowledgeCheckpoint message

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize
the pending checkpoint 1. Failure reason: Failure to finalize checkpoint.

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1381)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
~[event_executor-1.0-SNAPSHOT.jar:?]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]

at java.lang.Thread.run(Thread.java:834) [?:?]

Caused by: java.io.IOException: Target file
file:/opt/flink/pm/checkpoint/6e6b1332/chk-1/_metadata
already exists.

at
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
~[event_executor-1.0-SNAPSHOT.jar:?]


Please let me know if anyone knows how to resolve this issue.

Thanks and Regards

Amenreet Singh Sodhi


Re: Flink Job across Data Centers

2023-04-13 Thread Hang Ruan
Hi, Chirag,

I am not sure whether this FLIP-268[1] is what you want.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-268%3A+Kafka+Rack+Awareness

Andrew Otto  于2023年4月12日周三 22:12写道:

> Hi, I asked a similar question in this thread
> <https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m>, which
> might have some relevant info.
>
> On Wed, Apr 12, 2023 at 7:23 AM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> Can anyone share any experience on running Flink jobs across data centers?
>>
>> I am trying to create a Multi site/Geo Replicated Kafka cluster. I want
>> that my Flink job to be closely colocated with my Kafka multi site cluster.
>> If the Flink job is bound to a single data center, I believe we will
>> observe a lot of client latency by trying to access the broker in another
>> DC.
>>
>> Rather if I can make my Flink Kafka collectors as rack aware and start
>> fetching data from the closest Kafka broker, I should get better results.
>>
>> I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache
>> Kafka.
>>
>> Thanks.
>>
>>


Re: Flink job manager conditional start of flink jobs

2023-04-13 Thread Hang Ruan
Hi, nage,

I agree to the Shammon's suggestion after reading the context. Maybe you
need a 'Job Management Service' to manage all jobs among different
namespaces. I think the job management is not suitable for implementation
in the Flink engine.

Best,
Hang

Shammon FY  于2023年4月13日周四 11:34写道:

> Hi
>
> The job in ns2 has the permission to stop the job in ns1? How about
> managing the relationship in your `Job Submission Service` if it exists.
> The service can check and stop the job in ns1 before it submitting the job
> to ns2, what do you think?
>
> Best,
> Shammon FY
>
>
> On Thu, Apr 13, 2023 at 10:50 AM naga sudhakar 
> wrote:
>
>> Hi,
>> Thanks for your reply.
>> It is slightly different, would be happy to have any suggestion  for the
>> scenario you mentioned.
>> My scenario: I have 2 namespaces say ns1,ns2. I have to make sure only
>> one of ns1 or ns2 should run my flink jobs. Say initially ns1 is running
>> flink jobs, later planned to move them to ns2. Now when I start in ns2, I
>> can make an api call to ns1 jobmanager about running jobs and if no jobs
>> then only I should start in ns2. I can introduce this logic inside the
>> flink job java main method  where my total streaming logic present. So if I
>> identify then, how can I stop the initiated job?
>>
>> Thanks,
>> Nagasudhakar.
>>
>> On Thu, 13 Apr, 2023, 7:08 am Shammon FY,  wrote:
>>
>>> Hi naga
>>>
>>> Could you provide a specific description of your scene? It sounds like
>>> your requirement requires a uniqueness check to ensure that there are no
>>> multiple identical jobs running simultaneously, right?
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Wed, Apr 12, 2023 at 4:08 PM naga sudhakar 
>>> wrote:
>>>
>>>> Thanks for your email.
>>>> I am looking  more in terms of running  these flinkk jobs in multi
>>>> names pace environment and make sure only one namespace  flink jobs are
>>>> running.
>>>> So on the Job manager when i try to start a flink job, it has to check
>>>> if it's allowed to run in this namespace  or not and accordingly flink job
>>>> shud turn into running state otherwise it shud cancel  by itself
>>>>
>>>>
>>>>
>>>> On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is the job you want to start running or already finished?
>>>>> If the job is running, this is simply a failover or a JM failover
>>>>> case.
>>>>> While if the job has finished, there's no such feature that can
>>>>> restart the job
>>>>> automatically, AFAIK.  The job has to be submitted again.
>>>>>
>>>>> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
>>>>> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>> Greetings!!
>>>>>> Just wanted to know when job manager or task manager is being
>>>>>> restarted, is there a way to run the existing flink jobs based on a
>>>>>> condition? Same query when I am starting flink job fresh also.
>>>>>>
>>>>>> Please let me know if any more information is required from my side.
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Nagasudhakar  Sajja.
>>>>>>
>>>>>


Re: Flink job manager conditional start of flink jobs

2023-04-12 Thread Shammon FY
Hi

The job in ns2 has the permission to stop the job in ns1? How about
managing the relationship in your `Job Submission Service` if it exists.
The service can check and stop the job in ns1 before it submitting the job
to ns2, what do you think?

Best,
Shammon FY


On Thu, Apr 13, 2023 at 10:50 AM naga sudhakar 
wrote:

> Hi,
> Thanks for your reply.
> It is slightly different, would be happy to have any suggestion  for the
> scenario you mentioned.
> My scenario: I have 2 namespaces say ns1,ns2. I have to make sure only one
> of ns1 or ns2 should run my flink jobs. Say initially ns1 is running flink
> jobs, later planned to move them to ns2. Now when I start in ns2, I can
> make an api call to ns1 jobmanager about running jobs and if no jobs then
> only I should start in ns2. I can introduce this logic inside the flink job
> java main method  where my total streaming logic present. So if I identify
> then, how can I stop the initiated job?
>
> Thanks,
> Nagasudhakar.
>
> On Thu, 13 Apr, 2023, 7:08 am Shammon FY,  wrote:
>
>> Hi naga
>>
>> Could you provide a specific description of your scene? It sounds like
>> your requirement requires a uniqueness check to ensure that there are no
>> multiple identical jobs running simultaneously, right?
>>
>> Best,
>> Shammon FY
>>
>> On Wed, Apr 12, 2023 at 4:08 PM naga sudhakar 
>> wrote:
>>
>>> Thanks for your email.
>>> I am looking  more in terms of running  these flinkk jobs in multi names
>>> pace environment and make sure only one namespace  flink jobs are running.
>>> So on the Job manager when i try to start a flink job, it has to check
>>> if it's allowed to run in this namespace  or not and accordingly flink job
>>> shud turn into running state otherwise it shud cancel  by itself
>>>
>>>
>>>
>>> On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:
>>>
>>>> Hi,
>>>>
>>>> Is the job you want to start running or already finished?
>>>> If the job is running, this is simply a failover or a JM failover case.
>>>> While if the job has finished, there's no such feature that can restart
>>>> the job
>>>> automatically, AFAIK.  The job has to be submitted again.
>>>>
>>>> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>> Greetings!!
>>>>> Just wanted to know when job manager or task manager is being
>>>>> restarted, is there a way to run the existing flink jobs based on a
>>>>> condition? Same query when I am starting flink job fresh also.
>>>>>
>>>>> Please let me know if any more information is required from my side.
>>>>>
>>>>> Thanks & Regards
>>>>> Nagasudhakar  Sajja.
>>>>>
>>>>


Re: Flink job manager conditional start of flink jobs

2023-04-12 Thread naga sudhakar
Hi,
Thanks for your reply.
It is slightly different, would be happy to have any suggestion  for the
scenario you mentioned.
My scenario: I have 2 namespaces say ns1,ns2. I have to make sure only one
of ns1 or ns2 should run my flink jobs. Say initially ns1 is running flink
jobs, later planned to move them to ns2. Now when I start in ns2, I can
make an api call to ns1 jobmanager about running jobs and if no jobs then
only I should start in ns2. I can introduce this logic inside the flink job
java main method  where my total streaming logic present. So if I identify
then, how can I stop the initiated job?

Thanks,
Nagasudhakar.

On Thu, 13 Apr, 2023, 7:08 am Shammon FY,  wrote:

> Hi naga
>
> Could you provide a specific description of your scene? It sounds like
> your requirement requires a uniqueness check to ensure that there are no
> multiple identical jobs running simultaneously, right?
>
> Best,
> Shammon FY
>
> On Wed, Apr 12, 2023 at 4:08 PM naga sudhakar 
> wrote:
>
>> Thanks for your email.
>> I am looking  more in terms of running  these flinkk jobs in multi names
>> pace environment and make sure only one namespace  flink jobs are running.
>> So on the Job manager when i try to start a flink job, it has to check if
>> it's allowed to run in this namespace  or not and accordingly flink job
>> shud turn into running state otherwise it shud cancel  by itself
>>
>>
>>
>> On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:
>>
>>> Hi,
>>>
>>> Is the job you want to start running or already finished?
>>> If the job is running, this is simply a failover or a JM failover case.
>>> While if the job has finished, there's no such feature that can restart
>>> the job
>>> automatically, AFAIK.  The job has to be submitted again.
>>>
>>> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
>>> wrote:
>>>
>>>> Hi Team,
>>>> Greetings!!
>>>> Just wanted to know when job manager or task manager is being
>>>> restarted, is there a way to run the existing flink jobs based on a
>>>> condition? Same query when I am starting flink job fresh also.
>>>>
>>>> Please let me know if any more information is required from my side.
>>>>
>>>> Thanks & Regards
>>>> Nagasudhakar  Sajja.
>>>>
>>>


Re: Flink job manager conditional start of flink jobs

2023-04-12 Thread Shammon FY
Hi naga

Could you provide a specific description of your scene? It sounds like your
requirement requires a uniqueness check to ensure that there are no
multiple identical jobs running simultaneously, right?

Best,
Shammon FY

On Wed, Apr 12, 2023 at 4:08 PM naga sudhakar 
wrote:

> Thanks for your email.
> I am looking  more in terms of running  these flinkk jobs in multi names
> pace environment and make sure only one namespace  flink jobs are running.
> So on the Job manager when i try to start a flink job, it has to check if
> it's allowed to run in this namespace  or not and accordingly flink job
> shud turn into running state otherwise it shud cancel  by itself
>
>
>
> On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:
>
>> Hi,
>>
>> Is the job you want to start running or already finished?
>> If the job is running, this is simply a failover or a JM failover case.
>> While if the job has finished, there's no such feature that can restart
>> the job
>> automatically, AFAIK.  The job has to be submitted again.
>>
>> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
>> wrote:
>>
>>> Hi Team,
>>> Greetings!!
>>> Just wanted to know when job manager or task manager is being restarted,
>>> is there a way to run the existing flink jobs based on a condition? Same
>>> query when I am starting flink job fresh also.
>>>
>>> Please let me know if any more information is required from my side.
>>>
>>> Thanks & Regards
>>> Nagasudhakar  Sajja.
>>>
>>


Re: Flink Job across Data Centers

2023-04-12 Thread Andrew Otto
Hi, I asked a similar question in this thread
<https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m>, which
might have some relevant info.

On Wed, Apr 12, 2023 at 7:23 AM Chirag Dewan via user 
wrote:

> Hi,
>
> Can anyone share any experience on running Flink jobs across data centers?
>
> I am trying to create a Multi site/Geo Replicated Kafka cluster. I want
> that my Flink job to be closely colocated with my Kafka multi site cluster.
> If the Flink job is bound to a single data center, I believe we will
> observe a lot of client latency by trying to access the broker in another
> DC.
>
> Rather if I can make my Flink Kafka collectors as rack aware and start
> fetching data from the closest Kafka broker, I should get better results.
>
> I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache
> Kafka.
>
> Thanks.
>
>


Flink Job across Data Centers

2023-04-12 Thread Chirag Dewan via user
Hi,
Can anyone share any experience on running Flink jobs across data centers?
I am trying to create a Multi site/Geo Replicated Kafka cluster. I want that my 
Flink job to be closely colocated with my Kafka multi site cluster. If the 
Flink job is bound to a single data center, I believe we will observe a lot of 
client latency by trying to access the broker in another DC.
Rather if I can make my Flink Kafka collectors as rack aware and start fetching 
data from the closest Kafka broker, I should get better results.
I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache Kafka.
Thanks.


Re: Flink job manager conditional start of flink jobs

2023-04-12 Thread naga sudhakar
Thanks for your email.
I am looking  more in terms of running  these flinkk jobs in multi names
pace environment and make sure only one namespace  flink jobs are running.
So on the Job manager when i try to start a flink job, it has to check if
it's allowed to run in this namespace  or not and accordingly flink job
shud turn into running state otherwise it shud cancel  by itself



On Wed, 12 Apr, 2023, 12:06 pm Gen Luo,  wrote:

> Hi,
>
> Is the job you want to start running or already finished?
> If the job is running, this is simply a failover or a JM failover case.
> While if the job has finished, there's no such feature that can restart
> the job
> automatically, AFAIK.  The job has to be submitted again.
>
> On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
> wrote:
>
>> Hi Team,
>> Greetings!!
>> Just wanted to know when job manager or task manager is being restarted,
>> is there a way to run the existing flink jobs based on a condition? Same
>> query when I am starting flink job fresh also.
>>
>> Please let me know if any more information is required from my side.
>>
>> Thanks & Regards
>> Nagasudhakar  Sajja.
>>
>


Re: Flink job manager conditional start of flink jobs

2023-04-12 Thread Gen Luo
Hi,

Is the job you want to start running or already finished?
If the job is running, this is simply a failover or a JM failover case.
While if the job has finished, there's no such feature that can restart the
job
automatically, AFAIK.  The job has to be submitted again.

On Wed, Apr 12, 2023 at 10:58 AM naga sudhakar 
wrote:

> Hi Team,
> Greetings!!
> Just wanted to know when job manager or task manager is being restarted,
> is there a way to run the existing flink jobs based on a condition? Same
> query when I am starting flink job fresh also.
>
> Please let me know if any more information is required from my side.
>
> Thanks & Regards
> Nagasudhakar  Sajja.
>


Flink job manager conditional start of flink jobs

2023-04-11 Thread naga sudhakar
Hi Team,
Greetings!!
Just wanted to know when job manager or task manager is being restarted, is
there a way to run the existing flink jobs based on a condition? Same query
when I am starting flink job fresh also.

Please let me know if any more information is required from my side.

Thanks & Regards
Nagasudhakar  Sajja.


Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq



理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu"  写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>


Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> >> 每个序号不是代表一次尝试么
> >>
>
>


Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> 每个序号不是代表一次尝试么
>>


Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread Weihua Hu
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> 每个序号不是代表一次尝试么
>


flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 Thread guanyq
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么


Re: Flink Job Manager Recovery from EKS Node Terminations

2023-01-11 Thread Yang Wang
First, JobManager does not store any persistent data to local when the
Kubernetes HA + S3 used.
It means that you do not need to mount a PV for JobMananger deployment.

Secondly, node failures or terminations should not cause
the CrashLoopBackOff status.
One possible reason I could imagine is a bug FLINK-28265[1], which is fixed
in 1.15.3.

BTW, it will be great if you could share the logs of initial JobManager pod
and crashed JobManager pod.

[1]. https://issues.apache.org/jira/browse/FLINK-28265


Best,
Yang


Vijay Jammi  于2023年1月6日周五 04:24写道:

> Hi,
>
> Have a query on the Job Manager HA for flink 1.15.
>
> We currently run a standalone flink cluster with a single JobManager and
> multiple TaskManagers, deployed on top of a kubernetes cluster (EKS
> cluster) in application mode (reactive mode).
>
> The Task Managers are deployed as a ReplicaSet and the single Job Manager
> is configured to be highly available using the Kubernetes HA services with
> recovery data being written to S3.
>   high-availability.storageDir:
> s3:///flink//recovery
>
> We also have configured our cluster for the rocksdb state backend with
> checkpoints being written to S3.
>   state.backend: rocksdb
>   state.checkpoints.dir:
> s3:///flink//checkpoints
>
> Now to test the Job Manager HA, when we delete the job manager deployment
> (to simulate job manager crash), we see that Kubernetes (EKS) detects
> the failure, launches a new Job Manager pod and is able to recover the
> application cluster from the last successful checkpoint (Restoring job
> 000 from Checkpoint 5 @ 167...3692 for 000 located at
> s3://.../checkpoints/0.../chk-5).
>
> However, if we terminate the underlying node (EC2 instance) on which the
> Job Manager pod is scheduled, the cluster is unable to recover from this
> scenario. What we are seeing is that Kubernetes as usual tries and retries
> repeatedly to launch a newer Job Manager but this time the job manager is
> unable to find the checkpoint to recover from (No checkpoint found during
> restore), eventually going into a CrashLoopBackOff status after max
> attempts of restart.
>
> Now the query is will the Job Manager need to be configured to store its
> state to a local working directory over persistent volumes? Any pointers on
> how we can recover the cluster from such node failures or terminations?
>
> Vijay Jammi
>


Flink Job Manager Recovery from EKS Node Terminations

2023-01-05 Thread Vijay Jammi
Hi,

Have a query on the Job Manager HA for flink 1.15.

We currently run a standalone flink cluster with a single JobManager and
multiple TaskManagers, deployed on top of a kubernetes cluster (EKS
cluster) in application mode (reactive mode).

The Task Managers are deployed as a ReplicaSet and the single Job Manager
is configured to be highly available using the Kubernetes HA services with
recovery data being written to S3.
  high-availability.storageDir:
s3:///flink//recovery

We also have configured our cluster for the rocksdb state backend with
checkpoints being written to S3.
  state.backend: rocksdb
  state.checkpoints.dir: s3:///flink//checkpoints

Now to test the Job Manager HA, when we delete the job manager deployment
(to simulate job manager crash), we see that Kubernetes (EKS) detects
the failure, launches a new Job Manager pod and is able to recover the
application cluster from the last successful checkpoint (Restoring job
000 from Checkpoint 5 @ 167...3692 for 000 located at
s3://.../checkpoints/0.../chk-5).

However, if we terminate the underlying node (EC2 instance) on which the
Job Manager pod is scheduled, the cluster is unable to recover from this
scenario. What we are seeing is that Kubernetes as usual tries and retries
repeatedly to launch a newer Job Manager but this time the job manager is
unable to find the checkpoint to recover from (No checkpoint found during
restore), eventually going into a CrashLoopBackOff status after max
attempts of restart.

Now the query is will the Job Manager need to be configured to store its
state to a local working directory over persistent volumes? Any pointers on
how we can recover the cluster from such node failures or terminations?

Vijay Jammi


Re: How to get failed streaming Flink job log in Flink Native K8s mode?

2023-01-03 Thread Yang Wang
I think you might need a sidecar container or daemonset to collect the
Flink logs and store into a persistent storage.
You could find more information here[1].

[1].
https://www.alibabacloud.com/blog/best-practices-of-kubernetes-log-collection_596356

Best,
Yang

hjw  于2022年12月22日周四 23:28写道:

> On Flink Native K8s mode, the pod of JM and TM will disappear if the
> streaming job failed.Are there any ways to get the log of the failed
> Streaming job?
> I only think of a solution that is to mount job logs to NFS for
> persistence through pv-pvc defined in pod-template.
>
> ENV:
> Flink version:1.15.0
> Mode: Flink kubernetes Operator 1.2.0(Application Mode)
>
> --
> Best,
> Hjw
>


How to get failed streaming Flink job log in Flink Native K8s mode?

2022-12-22 Thread hjw
On Flink Native K8s mode, the pod of JM and TM will disappear if the streaming 
job failed.Are there any ways to get the log of the failed Streaming job?
I only think of a solution that is to mount job logs to NFS for persistence 
through pv-pvc defined in pod-template.


ENV:
Flink version:1.15.0
Mode: Flink kubernetes Operator 1.2.0(Application Mode)



--

Best,
Hjw

Re: Query about flink job manager dashboard

2022-12-08 Thread naga sudhakar
Is it possible to disable to dashboard port running, but still api running
on different port? Let us know if we can configure this?

On Mon, 5 Dec, 2022, 6:08 AM naga sudhakar,  wrote:

> Any suggestions for these apis to work after applying these configuration?
> Basically I suspect webupload directory cobfif is not taken also when these
> were set to false.
>
> web.submit.enable: Enables uploading and starting jobs through the Flink
> UI (true by default). Please note that even when this is disabled, session
> clusters still accept jobs through REST requests (HTTP calls). This flag
> only guards the feature to upload jobs in the UI.
> web.cancel.enable: Enables canceling jobs through the Flink UI (true by
> default). Please note that even when this is disabled, session clusters
> still cancel jobs through REST requests (HTTP calls). This flag only guards
> the feature to cancel jobs in the UI.
> web.upload.dir: The directory where to store uploaded jobs. Only used when
> web.submit.enable is true
>
> On Wed, 30 Nov, 2022, 10:46 AM naga sudhakar, 
> wrote:
>
>> After disabling the cancel, submit flags facing issues with below api
>> calls.
>>
>> 1) /jars giving 404
>> 2) /jars/upload
>> 3) /jars/{jarid}/run
>>
>> Is there any config changes needed to have these apis work?
>>
>>
>> On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar, 
>> wrote:
>>
>>> Hi,
>>> We are able to disable this cancela nd upload otpion in ui.
>>> But this is having issues with endpoints for below.
>>> Get call for /jars to list all uploaded jars and post call
>>> /jars/{jarid}/run are giving 404 after disabling the two flags.
>>> Is the process of uploading jars and running a jar with specific id
>>> changes after this change?
>>>
>>> Please suggest.
>>>
>>> Thanks & Regards,
>>> Nagasudhakar
>>>
>>> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> 1) No, that's currently not possible.
>>>> 2) You could consider disabling to disallow uploading new JARs and/or
>>>> cancelling jobs from the UI. See
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>> Greetings!!!
>>>>>> I am a software developer using apache flink and deploying flink jobs
>>>>>> using the same. I have two queries about flink job manager dashboard. Can
>>>>>> you please help with below?
>>>>>>
>>>>>> 1) is it possible to add login mechanism for the flink job manager
>>>>>> dash board and have a role based mechanism for viewing running jobs,
>>>>>> cancelling jobs, adding the jobs?
>>>>>> 2) is it possible to disable to dash bord display but use api to do
>>>>>> the same operations using API?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Nagasudhakar.
>>>>>>
>>>>>


Re: Query about flink job manager dashboard

2022-12-04 Thread naga sudhakar
Any suggestions for these apis to work after applying these configuration?
Basically I suspect webupload directory cobfif is not taken also when these
were set to false.

web.submit.enable: Enables uploading and starting jobs through the Flink UI
(true by default). Please note that even when this is disabled, session
clusters still accept jobs through REST requests (HTTP calls). This flag
only guards the feature to upload jobs in the UI.
web.cancel.enable: Enables canceling jobs through the Flink UI (true by
default). Please note that even when this is disabled, session clusters
still cancel jobs through REST requests (HTTP calls). This flag only guards
the feature to cancel jobs in the UI.
web.upload.dir: The directory where to store uploaded jobs. Only used when
web.submit.enable is true

On Wed, 30 Nov, 2022, 10:46 AM naga sudhakar, 
wrote:

> After disabling the cancel, submit flags facing issues with below api
> calls.
>
> 1) /jars giving 404
> 2) /jars/upload
> 3) /jars/{jarid}/run
>
> Is there any config changes needed to have these apis work?
>
>
> On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar, 
> wrote:
>
>> Hi,
>> We are able to disable this cancela nd upload otpion in ui.
>> But this is having issues with endpoints for below.
>> Get call for /jars to list all uploaded jars and post call
>> /jars/{jarid}/run are giving 404 after disabling the two flags.
>> Is the process of uploading jars and running a jar with specific id
>> changes after this change?
>>
>> Please suggest.
>>
>> Thanks & Regards,
>> Nagasudhakar
>>
>> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
>> wrote:
>>
>>> Hi,
>>>
>>> 1) No, that's currently not possible.
>>> 2) You could consider disabling to disallow uploading new JARs and/or
>>> cancelling jobs from the UI. See
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
>>> wrote:
>>>
>>>> Hi Team,
>>>>> Greetings!!!
>>>>> I am a software developer using apache flink and deploying flink jobs
>>>>> using the same. I have two queries about flink job manager dashboard. Can
>>>>> you please help with below?
>>>>>
>>>>> 1) is it possible to add login mechanism for the flink job manager
>>>>> dash board and have a role based mechanism for viewing running jobs,
>>>>> cancelling jobs, adding the jobs?
>>>>> 2) is it possible to disable to dash bord display but use api to do
>>>>> the same operations using API?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Nagasudhakar.
>>>>>
>>>>


Re: Query about flink job manager dashboard

2022-11-30 Thread Chesnay Schepler
There's no way to /disable/ the UI. (But you could cut out the 
javascript stuff from the flink-dist jar)


I'm curious why you'd want that though; since it works against the REST 
API it provides a strict subset of the REST API functionality.


On 30/11/2022 16:25, Berkay Polat wrote:

Hi Chesnay,

I have a similar question on this topic. Is there an option to disable 
the frontend altogether but still use REST APIs?


Thanks

On Wed, Nov 30, 2022 at 1:37 AM Chesnay Schepler  
wrote:


There's no way to disable the jar submission in the UI but have it
still work via the REST API.

On 30/11/2022 06:16, naga sudhakar wrote:

After disabling the cancel, submit flags facing issues with below
api calls.

1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,
 wrote:

Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with
specific id changes after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser,
 wrote:

Hi,

1) No, that's currently not possible.
2) You could consider disabling to disallow uploading new
JARs and/or cancelling jobs from the UI. See

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

<https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/*advanced-options-for-flink-web-ui__;Iw!!DCbAVzZNrAf4!B-eZxS_lrz683kg6TY5I6gI7lex3qiRrQTWoM_EbECDFr62njjFlxqyUFfLFS6GofOfFjNbK9HcXrwyq0w$>

Best regards,

Martijn

On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar
 wrote:

Hi Team,
Greetings!!!
I am a software developer using apache flink and
deploying flink jobs using the same. I have two
queries about flink job manager dashboard. Can
you please help with below?

1) is it possible to add login mechanism for the
        flink job manager dash board and have a role
based mechanism for viewing running jobs,
cancelling jobs, adding the jobs?
2) is it possible to disable to dash bord display
but use api to do the same operations using API?


Thanks,
Nagasudhakar.



--
*BERKAY POLAT*
Software Engineer SMTS | MuleSoft at Salesforce
Mobile: 443-710-7021

<https://smart.salesforce.com/sig/bpolat//us_mb/default/link.html>




Re: Query about flink job manager dashboard

2022-11-30 Thread Berkay Polat via user
Hi Chesnay,

I have a similar question on this topic. Is there an option to disable the
frontend altogether but still use REST APIs?

Thanks

On Wed, Nov 30, 2022 at 1:37 AM Chesnay Schepler  wrote:

> There's no way to disable the jar submission in the UI but have it still
> work via the REST API.
>
> On 30/11/2022 06:16, naga sudhakar wrote:
>
> After disabling the cancel, submit flags facing issues with below api
> calls.
>
> 1) /jars giving 404
> 2) /jars/upload
> 3) /jars/{jarid}/run
>
> Is there any config changes needed to have these apis work?
>
>
> On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar, 
> wrote:
>
>> Hi,
>> We are able to disable this cancela nd upload otpion in ui.
>> But this is having issues with endpoints for below.
>> Get call for /jars to list all uploaded jars and post call
>> /jars/{jarid}/run are giving 404 after disabling the two flags.
>> Is the process of uploading jars and running a jar with specific id
>> changes after this change?
>>
>> Please suggest.
>>
>> Thanks & Regards,
>> Nagasudhakar
>>
>> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
>> wrote:
>>
>>> Hi,
>>>
>>> 1) No, that's currently not possible.
>>> 2) You could consider disabling to disallow uploading new JARs and/or
>>> cancelling jobs from the UI. See
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/*advanced-options-for-flink-web-ui__;Iw!!DCbAVzZNrAf4!B-eZxS_lrz683kg6TY5I6gI7lex3qiRrQTWoM_EbECDFr62njjFlxqyUFfLFS6GofOfFjNbK9HcXrwyq0w$>
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
>>> wrote:
>>>
>>>> Hi Team,
>>>>> Greetings!!!
>>>>> I am a software developer using apache flink and deploying flink jobs
>>>>> using the same. I have two queries about flink job manager dashboard. Can
>>>>> you please help with below?
>>>>>
>>>>> 1) is it possible to add login mechanism for the flink job manager
>>>>> dash board and have a role based mechanism for viewing running jobs,
>>>>> cancelling jobs, adding the jobs?
>>>>> 2) is it possible to disable to dash bord display but use api to do
>>>>> the same operations using API?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Nagasudhakar.
>>>>>
>>>>
> --
*BERKAY POLAT*
Software Engineer SMTS | MuleSoft at Salesforce
Mobile: 443-710-7021

<https://smart.salesforce.com/sig/bpolat//us_mb/default/link.html>


Re: Query about flink job manager dashboard

2022-11-30 Thread Chesnay Schepler
There's no way to disable the jar submission in the UI but have it still 
work via the REST API.


On 30/11/2022 06:16, naga sudhakar wrote:
After disabling the cancel, submit flags facing issues with below api 
calls.


1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,  
wrote:


Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with specific
id changes after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser,
 wrote:

Hi,

1) No, that's currently not possible.
2) You could consider disabling to disallow uploading new JARs
and/or cancelling jobs from the UI. See

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

Best regards,

Martijn

On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar
 wrote:

Hi Team,
Greetings!!!
I am a software developer using apache flink and
deploying flink jobs using the same. I have two
queries about flink job manager dashboard. Can you
please help with below?

1) is it possible to add login mechanism for the flink
    job manager dash board and have a role based mechanism
for viewing running jobs, cancelling jobs, adding the
jobs?
2) is it possible to disable to dash bord display but
use api to do the same operations using API?


Thanks,
Nagasudhakar.



Re: Query about flink job manager dashboard

2022-11-29 Thread naga sudhakar
After disabling the cancel, submit flags facing issues with below api
calls.

1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,  wrote:

> Hi,
> We are able to disable this cancela nd upload otpion in ui.
> But this is having issues with endpoints for below.
> Get call for /jars to list all uploaded jars and post call
> /jars/{jarid}/run are giving 404 after disabling the two flags.
> Is the process of uploading jars and running a jar with specific id
> changes after this change?
>
> Please suggest.
>
> Thanks & Regards,
> Nagasudhakar
>
> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
> wrote:
>
>> Hi,
>>
>> 1) No, that's currently not possible.
>> 2) You could consider disabling to disallow uploading new JARs and/or
>> cancelling jobs from the UI. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
>> wrote:
>>
>>> Hi Team,
>>>> Greetings!!!
>>>> I am a software developer using apache flink and deploying flink jobs
>>>> using the same. I have two queries about flink job manager dashboard. Can
>>>> you please help with below?
>>>>
>>>> 1) is it possible to add login mechanism for the flink job manager dash
>>>> board and have a role based mechanism for viewing running jobs, cancelling
>>>> jobs, adding the jobs?
>>>> 2) is it possible to disable to dash bord display but use api to do the
>>>> same operations using API?
>>>>
>>>>
>>>> Thanks,
>>>> Nagasudhakar.
>>>>
>>>


Re: Query about flink job manager dashboard

2022-11-28 Thread naga sudhakar
Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with specific id changes
after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
wrote:

> Hi,
>
> 1) No, that's currently not possible.
> 2) You could consider disabling to disallow uploading new JARs and/or
> cancelling jobs from the UI. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>
> Best regards,
>
> Martijn
>
> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
> wrote:
>
>> Hi Team,
>>> Greetings!!!
>>> I am a software developer using apache flink and deploying flink jobs
>>> using the same. I have two queries about flink job manager dashboard. Can
>>> you please help with below?
>>>
>>> 1) is it possible to add login mechanism for the flink job manager dash
>>> board and have a role based mechanism for viewing running jobs, cancelling
>>> jobs, adding the jobs?
>>> 2) is it possible to disable to dash bord display but use api to do the
>>> same operations using API?
>>>
>>>
>>> Thanks,
>>> Nagasudhakar.
>>>
>>


Re: Query about flink job manager dashboard

2022-11-24 Thread naga sudhakar
Thank you for your response.

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
wrote:

> Hi,
>
> 1) No, that's currently not possible.
> 2) You could consider disabling to disallow uploading new JARs and/or
> cancelling jobs from the UI. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>
> Best regards,
>
> Martijn
>
> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
> wrote:
>
>> Hi Team,
>>> Greetings!!!
>>> I am a software developer using apache flink and deploying flink jobs
>>> using the same. I have two queries about flink job manager dashboard. Can
>>> you please help with below?
>>>
>>> 1) is it possible to add login mechanism for the flink job manager dash
>>> board and have a role based mechanism for viewing running jobs, cancelling
>>> jobs, adding the jobs?
>>> 2) is it possible to disable to dash bord display but use api to do the
>>> same operations using API?
>>>
>>>
>>> Thanks,
>>> Nagasudhakar.
>>>
>>


Re: Query about flink job manager dashboard

2022-11-24 Thread Martijn Visser
Hi,

1) No, that's currently not possible.
2) You could consider disabling to disallow uploading new JARs and/or
cancelling jobs from the UI. See
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

Best regards,

Martijn

On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
wrote:

> Hi Team,
>> Greetings!!!
>> I am a software developer using apache flink and deploying flink jobs
>> using the same. I have two queries about flink job manager dashboard. Can
>> you please help with below?
>>
>> 1) is it possible to add login mechanism for the flink job manager dash
>> board and have a role based mechanism for viewing running jobs, cancelling
>> jobs, adding the jobs?
>> 2) is it possible to disable to dash bord display but use api to do the
>> same operations using API?
>>
>>
>> Thanks,
>> Nagasudhakar.
>>
>


Query about flink job manager dashboard

2022-11-23 Thread naga sudhakar
>
> Hi Team,
> Greetings!!!
> I am a software developer using apache flink and deploying flink jobs
> using the same. I have two queries about flink job manager dashboard. Can
> you please help with below?
>
> 1) is it possible to add login mechanism for the flink job manager dash
> board and have a role based mechanism for viewing running jobs, cancelling
> jobs, adding the jobs?
> 2) is it possible to disable to dash bord display but use api to do the
> same operations using API?
>
>
> Thanks,
> Nagasudhakar.
>


Query about flink job manager dashboard

2022-11-23 Thread naga sudhakar
> Hi Team,
> Greetings!!!
> I am a software developer using apache flink and deploying flink jobs
> using the same. I have two queries about flink job manager dashboard. Can
> you please help with below?
>
> 1) is it possible to add login mechanism for the flink job manager dash
> board and have a role based mechanism for viewing running jobs, cancelling
> jobs, adding the jobs?
> 2) is it possible to disable to dash bord display but use api to do the
> same operations using API?
>
>
> Thanks,
> Nagasudhakar.
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Maximilian Michels
Taking a step back here: I think this needs to be handled in the
application mode in any case. Even if we had a better parser, it would
still treat # as a comment char. The application mode needs to be fixed to
come up with an escape scheme. YAML supports this via \# but that won't
work with our parser. So it needs to be something else. In the meantime, we
could at least add support for escapes in the configuration parser.

CC dev mailing list

-Max

On Tue, Nov 8, 2022 at 2:26 PM Maximilian Michels  wrote:

> The job fails when starting because its arguments are passed through the
> Flink configuration in application deployment mode.
>
> >This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
> https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189
>
> It's indeed a long-standing issue. We could easily replace the parsing
> logic with a standard YAML parser, we even have Jackson with YAML support
> built into flink-core. However, I think we worry that this might be
> breaking some applications which rely on the lenient behavior of the
> existing parser.
>
> -Max
>
> On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao 
> wrote:
>
>> Hi Yang,
>>
>> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
>> job spec not supporting `#` are caused by some common code?   or maybe they
>> are in different code paths?  My first guess was they are in different
>> code paths. The flink-conf is parsed when starting the flink cluster while
>> job spec is parsed when starting the job application.
>>
>> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Thanks for getting back. Could you share how to submit job to
>>> flinkk8operator in json format?
>>>
>>> We use the java Fabric8 K8 client, which serializes java
>>> FlinkDeployment objects to CustomResource YAML (see the code snippet
>>> below).  Since `#` is considered a special character denoting comments in
>>> YAML,  it should be escaped properly when YAML file is generated. We are
>>> also reading into the code to see if we can identify the place for the fix.
>>>
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>>> import
>>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>>
>>> FlinkDeployment deployment = ;
>>> CustomResourceDefinitionContext context = xxx;
>>> DefaultKubernetesClient client = xxx;
>>>
>>> client
>>>   .customResources(
>>>   context, FlinkDeployment.class, FlinkDeploymentList.class)
>>>   .inNamespace(xxx)
>>>   .withName(deploymentName)
>>>   .createOrReplace(deployment);
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang  wrote:
>>>
>>>> This is a known limit of the current Flink options parser. Refer to
>>>> FLINK-15358[1] for more information.
>>>>
>>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Gyula Fóra  于2022年11月8日周二 14:41写道:
>>>>
>>>>> It is also possible that this is a problem of the Flink native
>>>>> Kubernetes integration, we have to check where exactly it goes wrong 
>>>>> before
>>>>> we try to fix it .
>>>>>
>>>>> We simply set the args into a Flink config and pass it to the native
>>>>> deployment logic in the operator.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> How do you submit your yaml?
>>>>>>
>>>>>> It’s possible that this is not operator problem. Did you try
>>>>>> submitting the deployment in json format instead?
>>>>>>
>>>>>> If it still doesn't work please open a JIRA ticket with the details
>>>>>> to reproduce and what you have tried :)
>>>>>>
>>>>>> Cheers
>>>>>> Gyula
>>>>>>
>>>>>> On

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Maximilian Michels
The job fails when starting because its arguments are passed through the
Flink configuration in application deployment mode.

>This is a known limit of the current Flink options parser. Refer to
FLINK-15358[1] for more information.

Exactly. The issue stems from the GlobalConfiguration#loadYAMLResource:
https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L189

It's indeed a long-standing issue. We could easily replace the parsing
logic with a standard YAML parser, we even have Jackson with YAML support
built into flink-core. However, I think we worry that this might be
breaking some applications which rely on the lenient behavior of the
existing parser.

-Max

On Tue, Nov 8, 2022 at 12:36 PM liuxiangcao  wrote:

> Hi Yang,
>
> Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink
> job spec not supporting `#` are caused by some common code?   or maybe they
> are in different code paths?  My first guess was they are in different
> code paths. The flink-conf is parsed when starting the flink cluster while
> job spec is parsed when starting the job application.
>
> On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao 
> wrote:
>
>> Hi Gyula,
>>
>> Thanks for getting back. Could you share how to submit job to
>> flinkk8operator in json format?
>>
>> We use the java Fabric8 K8 client, which serializes java FlinkDeployment 
>> objects
>> to CustomResource YAML (see the code snippet below).  Since `#` is
>> considered a special character denoting comments in YAML,  it should be
>> escaped properly when YAML file is generated. We are also reading into the
>> code to see if we can identify the place for the fix.
>>
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
>> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
>> import
>> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>>
>> FlinkDeployment deployment = ;
>> CustomResourceDefinitionContext context = xxx;
>> DefaultKubernetesClient client = xxx;
>>
>> client
>>   .customResources(
>>   context, FlinkDeployment.class, FlinkDeploymentList.class)
>>   .inNamespace(xxx)
>>   .withName(deploymentName)
>>   .createOrReplace(deployment);
>>
>>
>>
>>
>>
>> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang  wrote:
>>
>>> This is a known limit of the current Flink options parser. Refer to
>>> FLINK-15358[1] for more information.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>>
>>> Best,
>>> Yang
>>>
>>> Gyula Fóra  于2022年11月8日周二 14:41写道:
>>>
>>>> It is also possible that this is a problem of the Flink native
>>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>>> we try to fix it .
>>>>
>>>> We simply set the args into a Flink config and pass it to the native
>>>> deployment logic in the operator.
>>>>
>>>> Gyula
>>>>
>>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> How do you submit your yaml?
>>>>>
>>>>> It’s possible that this is not operator problem. Did you try
>>>>> submitting the deployment in json format instead?
>>>>>
>>>>> If it still doesn't work please open a JIRA ticket with the details to
>>>>> reproduce and what you have tried :)
>>>>>
>>>>> Cheers
>>>>> Gyula
>>>>>
>>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>>> work on Ververica. Now we are switching to our own control plane to 
>>>>>> deploy
>>>>>> to flink-operaotor and the job started to fail due to the main args 
>>>>>> string
>>>>>> getting truncated at `#` character when passed to flink application. I
>>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>>

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread liuxiangcao
Hi Yang,

Do you think flink-conf not supporting `#` in FLINK-15358[1]  and Flink job
spec not supporting `#` are caused by some common code?   or maybe they are
in different code paths?  My first guess was they are in different code
paths. The flink-conf is parsed when starting the flink cluster while job
spec is parsed when starting the job application.

On Tue, Nov 8, 2022 at 3:27 AM liuxiangcao  wrote:

> Hi Gyula,
>
> Thanks for getting back. Could you share how to submit job to
> flinkk8operator in json format?
>
> We use the java Fabric8 K8 client, which serializes java FlinkDeployment 
> objects
> to CustomResource YAML (see the code snippet below).  Since `#` is
> considered a special character denoting comments in YAML,  it should be
> escaped properly when YAML file is generated. We are also reading into the
> code to see if we can identify the place for the fix.
>
> import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
> import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
> import
> io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
>
> FlinkDeployment deployment = ;
> CustomResourceDefinitionContext context = xxx;
> DefaultKubernetesClient client = xxx;
>
> client
>   .customResources(
>   context, FlinkDeployment.class, FlinkDeploymentList.class)
>   .inNamespace(xxx)
>   .withName(deploymentName)
>   .createOrReplace(deployment);
>
>
>
>
>
> On Tue, Nov 8, 2022 at 2:41 AM Yang Wang  wrote:
>
>> This is a known limit of the current Flink options parser. Refer to
>> FLINK-15358[1] for more information.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>>
>> Best,
>> Yang
>>
>> Gyula Fóra  于2022年11月8日周二 14:41写道:
>>
>>> It is also possible that this is a problem of the Flink native
>>> Kubernetes integration, we have to check where exactly it goes wrong before
>>> we try to fix it .
>>>
>>> We simply set the args into a Flink config and pass it to the native
>>> deployment logic in the operator.
>>>
>>> Gyula
>>>
>>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:
>>>
>>>> Hi!
>>>>
>>>> How do you submit your yaml?
>>>>
>>>> It’s possible that this is not operator problem. Did you try submitting
>>>> the deployment in json format instead?
>>>>
>>>> If it still doesn't work please open a JIRA ticket with the details to
>>>> reproduce and what you have tried :)
>>>>
>>>> Cheers
>>>> Gyula
>>>>
>>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a job that contains `#` as part of mainArgs and it used to
>>>>> work on Ververica. Now we are switching to our own control plane to deploy
>>>>> to flink-operaotor and the job started to fail due to the main args string
>>>>> getting truncated at `#` character when passed to flink application. I
>>>>> believe this is due to characters after `#` being interpreted as comments
>>>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>>>> needs to escape `#` when generating k8 yaml file.
>>>>>
>>>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>>>
>>>>> Here is the stack-trace:
>>>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>>>
>>>>> for key  '$internal.application.program-args'.\n\tat
>>>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>>>>  
>>>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>>>>  
>>>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>>>>  
>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>>>>  
>>>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(Kuber

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread liuxiangcao
Hi Gyula,

Thanks for getting back. Could you share how to submit job to
flinkk8operator in json format?

We use the java Fabric8 K8 client, which serializes java
FlinkDeployment objects
to CustomResource YAML (see the code snippet below).  Since `#` is
considered a special character denoting comments in YAML,  it should be
escaped properly when YAML file is generated. We are also reading into the
code to see if we can identify the place for the fix.

import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkDeploymentList;
import
io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;

FlinkDeployment deployment = ;
CustomResourceDefinitionContext context = xxx;
DefaultKubernetesClient client = xxx;

client
  .customResources(
  context, FlinkDeployment.class, FlinkDeploymentList.class)
  .inNamespace(xxx)
  .withName(deploymentName)
  .createOrReplace(deployment);





On Tue, Nov 8, 2022 at 2:41 AM Yang Wang  wrote:

> This is a known limit of the current Flink options parser. Refer to
> FLINK-15358[1] for more information.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-15358
>
> Best,
> Yang
>
> Gyula Fóra  于2022年11月8日周二 14:41写道:
>
>> It is also possible that this is a problem of the Flink native Kubernetes
>> integration, we have to check where exactly it goes wrong before we try to
>> fix it .
>>
>> We simply set the args into a Flink config and pass it to the native
>> deployment logic in the operator.
>>
>> Gyula
>>
>> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> How do you submit your yaml?
>>>
>>> It’s possible that this is not operator problem. Did you try submitting
>>> the deployment in json format instead?
>>>
>>> If it still doesn't work please open a JIRA ticket with the details to
>>> reproduce and what you have tried :)
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao 
>>> wrote:
>>>
 Hi,

 We have a job that contains `#` as part of mainArgs and it used to work
 on Ververica. Now we are switching to our own control plane to deploy to
 flink-operaotor and the job started to fail due to the main args string
 getting truncated at `#` character when passed to flink application. I
 believe this is due to characters after `#` being interpreted as comments
 in yaml file. To support having `#` in the mainArgs, the flink operator
 needs to escape `#` when generating k8 yaml file.

 Assuming the mainArgs contain '\"xyz#abc\".

 Here is the stack-trace:
 {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
 not parse value '\"xyz' *(Note: truncated by #)*

 for key  '$internal.application.program-args'.\n\tat
 org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
 org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
  
 org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
  
 org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
  
 org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
  
 org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
 was not closed properly*.\n\tat 
 org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
  
 org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
  
 org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
  
 org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
  
 org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
 org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
  5 more\n"},"@version":1,"source_host":"xx","message":"Could not 
 create application 
 program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}


  Can someone take a look and help fixing this issue? or I can help
 fixing this if someone can point me in the right direction.

 --
 Best Wishes & Regards
 Shawn Xiangcao Liu

>>>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Yang Wang
This is a known limit of the current Flink options parser. Refer to
FLINK-15358[1] for more information.

[1]. https://issues.apache.org/jira/browse/FLINK-15358

Best,
Yang

Gyula Fóra  于2022年11月8日周二 14:41写道:

> It is also possible that this is a problem of the Flink native Kubernetes
> integration, we have to check where exactly it goes wrong before we try to
> fix it .
>
> We simply set the args into a Flink config and pass it to the native
> deployment logic in the operator.
>
> Gyula
>
> On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:
>
>> Hi!
>>
>> How do you submit your yaml?
>>
>> It’s possible that this is not operator problem. Did you try submitting
>> the deployment in json format instead?
>>
>> If it still doesn't work please open a JIRA ticket with the details to
>> reproduce and what you have tried :)
>>
>> Cheers
>> Gyula
>>
>> On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:
>>
>>> Hi,
>>>
>>> We have a job that contains `#` as part of mainArgs and it used to work
>>> on Ververica. Now we are switching to our own control plane to deploy to
>>> flink-operaotor and the job started to fail due to the main args string
>>> getting truncated at `#` character when passed to flink application. I
>>> believe this is due to characters after `#` being interpreted as comments
>>> in yaml file. To support having `#` in the mainArgs, the flink operator
>>> needs to escape `#` when generating k8 yaml file.
>>>
>>> Assuming the mainArgs contain '\"xyz#abc\".
>>>
>>> Here is the stack-trace:
>>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>>> not parse value '\"xyz' *(Note: truncated by #)*
>>>
>>> for key  '$internal.application.program-args'.\n\tat
>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>>  
>>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>>  
>>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>>  
>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>>  
>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
>>> was not closed properly*.\n\tat 
>>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>>  
>>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>>  
>>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>>  
>>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>>  
>>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
>>> application 
>>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>>
>>>
>>>  Can someone take a look and help fixing this issue? or I can help
>>> fixing this if someone can point me in the right direction.
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
It is also possible that this is a problem of the Flink native Kubernetes
integration, we have to check where exactly it goes wrong before we try to
fix it .

We simply set the args into a Flink config and pass it to the native
deployment logic in the operator.

Gyula

On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:

> Hi!
>
> How do you submit your yaml?
>
> It’s possible that this is not operator problem. Did you try submitting
> the deployment in json format instead?
>
> If it still doesn't work please open a JIRA ticket with the details to
> reproduce and what you have tried :)
>
> Cheers
> Gyula
>
> On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:
>
>> Hi,
>>
>> We have a job that contains `#` as part of mainArgs and it used to work
>> on Ververica. Now we are switching to our own control plane to deploy to
>> flink-operaotor and the job started to fail due to the main args string
>> getting truncated at `#` character when passed to flink application. I
>> believe this is due to characters after `#` being interpreted as comments
>> in yaml file. To support having `#` in the mainArgs, the flink operator
>> needs to escape `#` when generating k8 yaml file.
>>
>> Assuming the mainArgs contain '\"xyz#abc\".
>>
>> Here is the stack-trace:
>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>> not parse value '\"xyz' *(Note: truncated by #)*
>>
>> for key  '$internal.application.program-args'.\n\tat
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>  
>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>  
>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
>> was not closed properly*.\n\tat 
>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>  
>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>  
>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
>> application 
>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>
>>
>>  Can someone take a look and help fixing this issue? or I can help fixing
>> this if someone can point me in the right direction.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
Hi!

How do you submit your yaml?

It’s possible that this is not operator problem. Did you try submitting the
deployment in json format instead?

If it still doesn't work please open a JIRA ticket with the details to
reproduce and what you have tried :)

Cheers
Gyula

On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:

> Hi,
>
> We have a job that contains `#` as part of mainArgs and it used to work on
> Ververica. Now we are switching to our own control plane to deploy to
> flink-operaotor and the job started to fail due to the main args string
> getting truncated at `#` character when passed to flink application. I
> believe this is due to characters after `#` being interpreted as comments
> in yaml file. To support having `#` in the mainArgs, the flink operator
> needs to escape `#` when generating k8 yaml file.
>
> Assuming the mainArgs contain '\"xyz#abc\".
>
> Here is the stack-trace:
> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
> not parse value '\"xyz' *(Note: truncated by #)*
>
> for key  '$internal.application.program-args'.\n\tat
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>  
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>  
> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting was 
> not closed properly*.\n\tat 
> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>  
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>  
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
> application 
> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>
>
>  Can someone take a look and help fixing this issue? or I can help fixing
> this if someone can point me in the right direction.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>


support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread liuxiangcao
Hi,

We have a job that contains `#` as part of mainArgs and it used to work on
Ververica. Now we are switching to our own control plane to deploy to
flink-operaotor and the job started to fail due to the main args string
getting truncated at `#` character when passed to flink application. I
believe this is due to characters after `#` being interpreted as comments
in yaml file. To support having `#` in the mainArgs, the flink operator
needs to escape `#` when generating k8 yaml file.

Assuming the mainArgs contain '\"xyz#abc\".

Here is the stack-trace:
{"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
not parse value '\"xyz' *(Note: truncated by #)*

for key  '$internal.application.program-args'.\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
by: *java.lang.IllegalArgumentException: Could not split string.
Quoting was not closed properly*.\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
java.base/java.util.Optional.map(Optional.java:265)\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
5 more\n"},"@version":1,"source_host":"xx","message":"Could not
create application
program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}


 Can someone take a look and help fixing this issue? or I can help fixing
this if someone can point me in the right direction.

-- 
Best Wishes & Regards
Shawn Xiangcao Liu


Re: 请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-12 Thread Weihua Hu
Hi,

不建议在 TM 内部多个 Task 间共享变量,每个 Task 单独使用自己的资源,在 RichFunction open 时初始化资源,close
时释放资源。否则容易导致资源泄露

Best,
Weihua


On Tue, Jul 12, 2022 at 2:31 PM RS  wrote:

> Hi,
>
>
> 如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法
>
>
>
> 资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,
>
>
> Thanks
>
>
> 在 2022-07-12 12:35:31,"Bruce Zu"  写道:
> > Flink team好,
> > 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
> >
> > 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
> >
> >我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
> >org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
> >一旦不再使用它就需要调用它的`close`方法来释放资源。
> >
> >所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
> >
> >我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
> >在 main 方法结束时释放资源。
> >
> >类似这样的伪代码:
> >```java
> >公共类 EsClientHolder {
> >  private static final ThreadLocal local = new
> >InheritableThreadLocal<>();
> >
> >  public static final void createAndSetEsClient(EsClient esClient){
> >local.set(esClient);
> >  }
> >
> >  private static final createAndSetEsClientBy(EsClientConfig
> >esClientConfig){
> >EsClient instance = new EsClient(esClientConfig);
> >createAndSetEsClient(instance)  ;
> >  }
> >
> >   private static final   EsClient get() {
> >EsClient c = local.get();
> >if(c == null){
> >  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
> >}
> >return c;
> >  }
> >
> >private static final  close()抛出 IOException {
> >EsClient o = local.get();
> >if(o!= null){
> >  o.close();
> >}
> >  }
> >
> >// 在 Fink 应用程序代码中的用法
> >   public class main class {
> >public static void main(String[] args) throws IOException {
> >  try {
> >property prop = null;
> >EsClientConfig configuration = getEsClientConfig(prop);
> >EsClientHolder.createAndSetEsClientBy(config);
> >   // …
> >   SomeClass.method1();
> >   other classes.method2();
> >   // ...
> >  } at last {
> >EsClientHolder.close();
> >  }
> >}
> >  }
> >
> >class SomeClass{
> >   public void. method 1(){
> >// 1. Use EsClient in any calling method of any other class:
> >EsClient esClient = EsClientHolder.get();
> >   // …
> >   }
> >}
> >class other class {
> >  public void method 2() {
> >  // 2. Use EsClient in any calling method of any forked child thread
> >new thread (
> >() -> {
> >  EsClient client = EsClientHolder.get();
> >  // …
> >})
> >. start();
> > // …
> >  }
> >}
> >
> >```
> >
> >我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
> >
> >但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
> >
> >比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
> >的线程不一样的线程,
> >那么运行method1和mehod2的线程就没有办法拿到EsClient了。
> >这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close()
> 拆分为在不同的线程中运行,则就
> >没有办法释放资源。
> >
> >谢谢!
>


Re:请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-12 Thread RS
Hi,


如果是访问ES的话,Flink里面自带ES的connector,你可以直接使用,或者参考源码,source和sink接口都有对应的方法


资源是否在一个线程里面,这个取决与你代码逻辑,如果在不同的线程或者进程的话,设计上,就不要用同一个EsClientHolder,各个不同阶段各自去new和close对象,


Thanks


在 2022-07-12 12:35:31,"Bruce Zu"  写道:
> Flink team好,
> 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
>
> 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。
>
>我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
>org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
>一旦不再使用它就需要调用它的`close`方法来释放资源。
>
>所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常
>
>我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
>在 main 方法结束时释放资源。
>
>类似这样的伪代码:
>```java
>公共类 EsClientHolder {
>  private static final ThreadLocal local = new
>InheritableThreadLocal<>();
>
>  public static final void createAndSetEsClient(EsClient esClient){
>local.set(esClient);
>  }
>
>  private static final createAndSetEsClientBy(EsClientConfig
>esClientConfig){
>EsClient instance = new EsClient(esClientConfig);
>createAndSetEsClient(instance)  ;
>  }
>
>   private static final   EsClient get() {
>EsClient c = local.get();
>if(c == null){
>  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
>}
>return c;
>  }
>
>private static final  close()抛出 IOException {
>EsClient o = local.get();
>if(o!= null){
>  o.close();
>}
>  }
>
>// 在 Fink 应用程序代码中的用法
>   public class main class {
>public static void main(String[] args) throws IOException {
>  try {
>property prop = null;
>EsClientConfig configuration = getEsClientConfig(prop);
>EsClientHolder.createAndSetEsClientBy(config);
>   // …
>   SomeClass.method1();
>   other classes.method2();
>   // ...
>  } at last {
>EsClientHolder.close();
>  }
>}
>  }
>
>class SomeClass{
>   public void. method 1(){
>// 1. Use EsClient in any calling method of any other class:
>EsClient esClient = EsClientHolder.get();
>   // …
>   }
>}
>class other class {
>  public void method 2() {
>  // 2. Use EsClient in any calling method of any forked child thread
>new thread (
>() -> {
>  EsClient client = EsClientHolder.get();
>  // …
>})
>. start();
> // …
>  }
>}
>
>```
>
>我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。
>
>但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。
>
>比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
>的线程不一样的线程,
>那么运行method1和mehod2的线程就没有办法拿到EsClient了。
>这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
>没有办法释放资源。
>
>谢谢!


请教:关于如何释放 Flink Job 中某个对象持有的资源

2022-07-11 Thread Bruce Zu
 Flink team好,
 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。

 我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。

我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从
org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作,
一旦不再使用它就需要调用它的`close`方法来释放资源。

所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常

我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且
在 main 方法结束时释放资源。

类似这样的伪代码:
```java
公共类 EsClientHolder {
  private static final ThreadLocal local = new
InheritableThreadLocal<>();

  public static final void createAndSetEsClient(EsClient esClient){
local.set(esClient);
  }

  private static final createAndSetEsClientBy(EsClientConfig
esClientConfig){
EsClient instance = new EsClient(esClientConfig);
createAndSetEsClient(instance)  ;
  }

   private static final   EsClient get() {
EsClient c = local.get();
if(c == null){
  throw new RuntimeException("确保在使用前创建并设置 EsClient 实例");
}
return c;
  }

private static final  close()抛出 IOException {
EsClient o = local.get();
if(o!= null){
  o.close();
}
  }

// 在 Fink 应用程序代码中的用法
   public class main class {
public static void main(String[] args) throws IOException {
  try {
property prop = null;
EsClientConfig configuration = getEsClientConfig(prop);
EsClientHolder.createAndSetEsClientBy(config);
   // …
   SomeClass.method1();
   other classes.method2();
   // ...
  } at last {
EsClientHolder.close();
  }
}
  }

class SomeClass{
   public void. method 1(){
// 1. Use EsClient in any calling method of any other class:
EsClient esClient = EsClientHolder.get();
   // …
   }
}
class other class {
  public void method 2() {
  // 2. Use EsClient in any calling method of any forked child thread
new thread (
() -> {
  EsClient client = EsClientHolder.get();
  // …
})
. start();
 // …
  }
}

```

我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。

但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。

比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass
的线程不一样的线程,
那么运行method1和mehod2的线程就没有办法拿到EsClient了。
这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就
没有办法释放资源。

谢谢!


Re: Is there an HA solution to run flink job with multiple source

2022-06-02 Thread Bariša Obradović
Hi,
our use is that the data sources are independent, we are using flink to
ingest data from kafka sources, do a bit of filtering and then write it to
S3.
Since we ingest from multiple kafka sources, and they are independent, we
consider them all optional. Even if 1 just kafka is up and running, we
would like to process it's data.

We use a single flink job, since we find it easier to manage less flink
jobs, and that way we also use less resources

So far, the idea from Xuyang seems doable to me, I'll explore the idea of
subclassing existing Kafka source and making sure that kafka source can
function even if kafka is down.
In the essence, we would like to treat situation of kafka being down, being
the same as if kafka is up, but has no data.
The caveat I can think of, is to add metrics and logs when kafka is down,
so we can still be aware of it, if we need to.

Cheers,
Barisa

On Wed, 1 Jun 2022 at 23:23, Alexander Fedulov 
wrote:

> Hi Bariša,
>
> The way I see it is you either
> - need data from all sources because you are doing some
> conjoint processing. In that case stopping the pipeline is usually the
> right thing to do.
> - the streams consumed from multiple servers are not combined and hence
> could be processed in independent Flink jobs.
> Maybe you could explain where specifically your situation does not fit in
> one of those two scenarios?
>
> Best,
> Alexander Fedulov
>
>
> On Wed, Jun 1, 2022 at 10:57 PM Jing Ge  wrote:
>
>> Hi Bariša,
>>
>> Could you share the reason why your data processing pipeline should keep
>> running when one kafka source is down?
>> It seems like any one among the multiple kafka sources is optional for
>> the data processing logic, because any kafka source could be the one that
>> is down.
>>
>> Best regards,
>> Jing
>>
>> On Wed, Jun 1, 2022 at 5:59 PM Xuyang  wrote:
>>
>>> I think you can try to use a custom source to do that although the one
>>> of the kafka sources is down the operator is also running(just do nothing).
>>> The only trouble is that you need to manage the checkpoint and something
>>> else yourself. But the good news is that you can copy the implementation of
>>> existing kafka source and change a little code conveniently.
>>>
>>> At 2022-06-01 22:38:39, "Bariša Obradović"  wrote:
>>>
>>> Hi,
>>> we are running a flink job with multiple kafka sources connected to
>>> different kafka servers.
>>>
>>> The problem we are facing is when one of the kafka's is down, the flink
>>> job starts restarting.
>>> Is there anyway for flink to pause processing of the kafka which is
>>> down, and yet continue processing from other sources?
>>>
>>> Cheers,
>>> Barisa
>>>
>>>


  1   2   3   4   5   6   7   8   9   >