[flink-k8s-connector] In-place scaling up often takes several times till it succeeds.

2023-12-06 Thread Xiaolong Wang
Hi,

I'm playing with a Flink 1.18 demo with the auto-scaler and the adaptive
scheduler.

The operator can correctly collect data and order the job to scale up, but
it'll take the job several times to reach the required parallelism.

E.g. The original parallelism for each vertex is something like below:

Vertex : write hourly_ads s3 || Parallelism : 100
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


Then the operator decides to scale the job to this:

Vertex : write hourly_ads s3 || Parallelism : 200
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


But when scaling, the vertex write hourly_ads s3 does scale directly from
100 to 200, but first scales to a number, say 120, then 150, then 180.
It'll take 3~4 times till the job to reach the required parallelism.

I'm wondering how to avoid this issue ?

Thanks in advanced.


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Chen Yu
Hi  Chen,
You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.

For single non insert query, flink will collect output to the console 
automatically. Therefore, you don’t need to add insert also works.

But you must point out target table specifically when you need to write data to 
external storage.

Like,

String relateQuery = "insert into xxx select correlator_id , name, relationship 
from Correlation; ;


Best,
Yu Chen

获取 Outlook for iOS

发件人: Zhanghao Chen 
发送时间: Wednesday, December 6, 2023 7:21:50 PM
收件人: elakiya udhayanan ; user@flink.apache.org 

主题: Re: Query on using two sinks for a Flink job (Flink SQL)

Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Reading text file from S3

2023-12-06 Thread Fourais
Hi,

Using Flink 1.18 and Java 17, I am trying to read a text file from S3 using
env.readTextFile("s3://mybucket/folder1/file.txt"). When I run the app in
the IDE, I get the following error:

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS
Credentials provided by DynamicTemporaryAWSCredentialsProvider
TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider
EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

I authenticated into AWS using SSO e.g. aws sso login --profile
my-aws-profile, so I do not have any keys set as environment variables. I
have tried the different CredentialsProvider options suggested in the error
message without success.

Could you help me identify what I am missing?

Thank you very much for your help,
/Fourais


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Feng Jin
Hi Elakiya,


You should use DML in the statement set  instead of DQL .


Here is a simple example:

executeSql("CREATE TABLE source_table1 ..");

executeSql("CREATE TABLE source_table2 ..");

executeSql("CREATE TABLE sink_table1 ..");

executeSql("CREATE TABLE sink_table1 ..");


stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.execute();


Best,
Feng


On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan 
wrote:

> Hi Xuyang, Zhangao,
>
> Thanks for your response, I have attached sample job files that I tried
> with the Statementset and with two queries. Please let me know if you are
> able to point out where I am possibly going wrong.
>
> Thanks,
> Elakiya
>
> On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:
>
>> Hi, Elakiya.
>> Are you following the example here[1]? Could you attach a minimal,
>> reproducible SQL?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>>
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>>
>> Hi Team,
>>  I would like to know the possibility of having two sinks in a
>> single Flink job. In my case I am using the Flink SQL based job where I try
>> to consume from two different Kafka topics using the create table (as
>> below) DDL and then use a join condition to correlate them and at present
>> write it to an external database (PostgreSQL - as a sink). I would like to
>> know if I can add another sink where I want to also write it to kafka topic
>> (as the second sink).
>> I tried using two sql scripts (two create and two insert for the same)
>> but was facing an exception* "Cannot have more than one execute() or
>> executeAsync() call in a single environment. at "*
>> Also tried to use the StatementSet functionality which again gave me an
>> exception *"org.apache.flink.table.api.TableException: Only insert
>> statement is supported now. at ".*
>> I am looking for some help in regards to this. TIA
>>
>> *Note:* I am using the Flink UI to submit my job.
>>
>> *Sample DDL statement used:*String statement = "CREATE TABLE Person
>> (\r\n" +
>> "  person ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>>
>> Thanks,
>> Elakiya
>>
>>


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Xuyang, Zhangao,

Thanks for your response, I have attached sample job files that I tried
with the Statementset and with two queries. Please let me know if you are
able to point out where I am possibly going wrong.

Thanks,
Elakiya

On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:

> Hi, Elakiya.
> Are you following the example here[1]? Could you attach a minimal,
> reproducible SQL?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used:*String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>
>


TwoSinks.java
Description: Binary data


TwoSinks2.java
Description: Binary data


Re:Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Xuyang
Hi, Elakiya.
Are you following the example here[1]? Could you attach a minimal, reproducible 
SQL?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/







--

Best!
Xuyang




At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA


Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya

Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Zhanghao Chen
Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Team,
 I would like to know the possibility of having two sinks in a single Flink
job. In my case I am using the Flink SQL based job where I try to consume
from two different Kafka topics using the create table (as below) DDL and
then use a join condition to correlate them and at present write it to an
external database (PostgreSQL - as a sink). I would like to know if I can
add another sink where I want to also write it to kafka topic (as the
second sink).
I tried using two sql scripts (two create and two insert for the same) but
was facing an exception* "Cannot have more than one execute() or
executeAsync() call in a single environment. at "*
Also tried to use the StatementSet functionality which again gave me an
exception *"org.apache.flink.table.api.TableException: Only insert
statement is supported now. at ".*
I am looking for some help in regards to this. TIA

*Note:* I am using the Flink UI to submit my job.

*Sample DDL statement used:*String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
+
")";

Thanks,
Elakiya


Re:Flink脏数据处理

2023-12-06 Thread Xuyang
Hi, 
目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。


但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。


要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]


[1] 
https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program



--

Best!
Xuyang





在 2023-12-06 15:26:56,"刘建"  写道:
>Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
>我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等


Re: Flink Kubernetes HA

2023-12-06 Thread Zhanghao Chen
Hi Ethan,

Pekko is basically a fork of Akka before its license change, so the usage is 
almost the same. From the exception posted, it looks like you are trying to 
connector to a terminated dispatcher, which usually indicates some exceptions 
on the JobManager side. You can try checking the JM log to find more clues.

"-cluster-config-map" is introduced by the single-leader election change of 
HA. In 1.13, each subcomponent of the jobmanager: rest endpoint, dispatcher, 
resourcemanager and jobmasters do leader election separately. In higher 
versions, a single leader election is performed with a unified config map for 
doing that.

Best,
Zhanghao Chen

From: Ethan T Yang 
Sent: Wednesday, December 6, 2023 5:40
To: user@flink.apache.org 
Subject: Flink Kubernetes HA

Hi Flink users,
After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when HA 
is enabled.( see exception below). I am using k8s deployment and I clean the 
previous configmaps, like leader files etc. I know the pekko is a recently 
thing. Can someone share doc on how to use or set it? When I disable HA, the 
deployment was successful. I also noticed a new configmap called 
“-cluster-config-map”, can someone provide reference on what it is for? I 
don’t see it in the 1.13.1 version.

Thanks a lot
Ivan



org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
not send message 
[LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
sender [unknown] to recipient [pe

kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], because 
the recipient is unreachable. This can either mean that the recipient has been 
terminated or that the remote RpcService i

s currently not reachable.

at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) ~[?:?]

at 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at java.util.Optional.ifPresent(Unknown Source) [?:?]

at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 

Re: Flink Kubernetes HA

2023-12-06 Thread Ethan T Yang
Never mind. The issue was fix due to the service account permission missing 
“patch” verb. Which lead to RPC service not started.

> On Dec 5, 2023, at 1:40 PM, Ethan T Yang  wrote:
> 
> Hi Flink users,
> After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when 
> HA is enabled.( see exception below). I am using k8s deployment and I clean 
> the previous configmaps, like leader files etc. I know the pekko is a 
> recently thing. Can someone share doc on how to use or set it? When I disable 
> HA, the deployment was successful. I also noticed a new configmap called 
> “-cluster-config-map”, can someone provide reference on what it is for? I 
> don’t see it in the 1.13.1 version.
> 
> Thanks a lot
> Ivan
> 
> 
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
> not send message 
> [LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
> sender [unknown] to recipient [pe
> kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], 
> because the recipient is unreachable. This can either mean that the recipient 
> has been terminated or that the remote RpcService i
> s currently not reachable.
>   at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) 
> ~[?:?]
>   at 
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at java.util.Optional.ifPresent(Unknown Source) [?:?]
>   at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
> [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
>