Re: Flink Kubernetes HA

2023-12-06 Thread Ethan T Yang
Never mind. The issue was fix due to the service account permission missing 
“patch” verb. Which lead to RPC service not started.

> On Dec 5, 2023, at 1:40 PM, Ethan T Yang  wrote:
> 
> Hi Flink users,
> After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when 
> HA is enabled.( see exception below). I am using k8s deployment and I clean 
> the previous configmaps, like leader files etc. I know the pekko is a 
> recently thing. Can someone share doc on how to use or set it? When I disable 
> HA, the deployment was successful. I also noticed a new configmap called 
> “-cluster-config-map”, can someone provide reference on what it is for? I 
> don’t see it in the 1.13.1 version.
> 
> Thanks a lot
> Ivan
> 
> 
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
> not send message 
> [LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
> sender [unknown] to recipient [pe
> kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], 
> because the recipient is unreachable. This can either mean that the recipient 
> has been terminated or that the remote RpcService i
> s currently not reachable.
>   at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) 
> ~[?:?]
>   at 
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>  ~[flink-dist-1.18.0.jar:1.18.0]
>   at java.util.Optional.ifPresent(Unknown Source) [?:?]
>   at 
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
> [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [flink-dist-1.18.0.jar:1.18.0]
>   at 
> org.a

Re: Flink Kubernetes HA

2023-12-06 Thread Zhanghao Chen
Hi Ethan,

Pekko is basically a fork of Akka before its license change, so the usage is 
almost the same. From the exception posted, it looks like you are trying to 
connector to a terminated dispatcher, which usually indicates some exceptions 
on the JobManager side. You can try checking the JM log to find more clues.

"-cluster-config-map" is introduced by the single-leader election change of 
HA. In 1.13, each subcomponent of the jobmanager: rest endpoint, dispatcher, 
resourcemanager and jobmasters do leader election separately. In higher 
versions, a single leader election is performed with a unified config map for 
doing that.

Best,
Zhanghao Chen

From: Ethan T Yang 
Sent: Wednesday, December 6, 2023 5:40
To: user@flink.apache.org 
Subject: Flink Kubernetes HA

Hi Flink users,
After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when HA 
is enabled.( see exception below). I am using k8s deployment and I clean the 
previous configmaps, like leader files etc. I know the pekko is a recently 
thing. Can someone share doc on how to use or set it? When I disable HA, the 
deployment was successful. I also noticed a new configmap called 
“-cluster-config-map”, can someone provide reference on what it is for? I 
don’t see it in the 1.13.1 version.

Thanks a lot
Ivan



org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
not send message 
[LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
sender [unknown] to recipient [pe

kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], because 
the recipient is unreachable. This can either mean that the recipient has been 
terminated or that the remote RpcService i

s currently not reachable.

at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) ~[?:?]

at 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at java.util.Optional.ifPresent(Unknown Source) [?:?]

at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.net

Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Team,
 I would like to know the possibility of having two sinks in a single Flink
job. In my case I am using the Flink SQL based job where I try to consume
from two different Kafka topics using the create table (as below) DDL and
then use a join condition to correlate them and at present write it to an
external database (PostgreSQL - as a sink). I would like to know if I can
add another sink where I want to also write it to kafka topic (as the
second sink).
I tried using two sql scripts (two create and two insert for the same) but
was facing an exception* "Cannot have more than one execute() or
executeAsync() call in a single environment. at "*
Also tried to use the StatementSet functionality which again gave me an
exception *"org.apache.flink.table.api.TableException: Only insert
statement is supported now. at ".*
I am looking for some help in regards to this. TIA

*Note:* I am using the Flink UI to submit my job.

*Sample DDL statement used:*String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
+
")";

Thanks,
Elakiya


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Zhanghao Chen
Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Re:Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Xuyang
Hi, Elakiya.
Are you following the example here[1]? Could you attach a minimal, reproducible 
SQL?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/







--

Best!
Xuyang




At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA


Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya

Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread elakiya udhayanan
Hi Xuyang, Zhangao,

Thanks for your response, I have attached sample job files that I tried
with the Statementset and with two queries. Please let me know if you are
able to point out where I am possibly going wrong.

Thanks,
Elakiya

On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:

> Hi, Elakiya.
> Are you following the example here[1]? Could you attach a minimal,
> reproducible SQL?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used:*String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>
>


TwoSinks.java
Description: Binary data


TwoSinks2.java
Description: Binary data


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Feng Jin
Hi Elakiya,


You should use DML in the statement set  instead of DQL .


Here is a simple example:

executeSql("CREATE TABLE source_table1 ..");

executeSql("CREATE TABLE source_table2 ..");

executeSql("CREATE TABLE sink_table1 ..");

executeSql("CREATE TABLE sink_table1 ..");


stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.execute();


Best,
Feng


On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan 
wrote:

> Hi Xuyang, Zhangao,
>
> Thanks for your response, I have attached sample job files that I tried
> with the Statementset and with two queries. Please let me know if you are
> able to point out where I am possibly going wrong.
>
> Thanks,
> Elakiya
>
> On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:
>
>> Hi, Elakiya.
>> Are you following the example here[1]? Could you attach a minimal,
>> reproducible SQL?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>>
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>>
>> Hi Team,
>>  I would like to know the possibility of having two sinks in a
>> single Flink job. In my case I am using the Flink SQL based job where I try
>> to consume from two different Kafka topics using the create table (as
>> below) DDL and then use a join condition to correlate them and at present
>> write it to an external database (PostgreSQL - as a sink). I would like to
>> know if I can add another sink where I want to also write it to kafka topic
>> (as the second sink).
>> I tried using two sql scripts (two create and two insert for the same)
>> but was facing an exception* "Cannot have more than one execute() or
>> executeAsync() call in a single environment. at "*
>> Also tried to use the StatementSet functionality which again gave me an
>> exception *"org.apache.flink.table.api.TableException: Only insert
>> statement is supported now. at ".*
>> I am looking for some help in regards to this. TIA
>>
>> *Note:* I am using the Flink UI to submit my job.
>>
>> *Sample DDL statement used:*String statement = "CREATE TABLE Person
>> (\r\n" +
>> "  person ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>>
>> Thanks,
>> Elakiya
>>
>>


Reading text file from S3

2023-12-06 Thread Fourais
Hi,

Using Flink 1.18 and Java 17, I am trying to read a text file from S3 using
env.readTextFile("s3://mybucket/folder1/file.txt"). When I run the app in
the IDE, I get the following error:

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS
Credentials provided by DynamicTemporaryAWSCredentialsProvider
TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider
EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider :
com.amazonaws.SdkClientException: Unable to load AWS credentials from
environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and
AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

I authenticated into AWS using SSO e.g. aws sso login --profile
my-aws-profile, so I do not have any keys set as environment variables. I
have tried the different CredentialsProvider options suggested in the error
message without success.

Could you help me identify what I am missing?

Thank you very much for your help,
/Fourais


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Chen Yu
Hi  Chen,
You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.

For single non insert query, flink will collect output to the console 
automatically. Therefore, you don’t need to add insert also works.

But you must point out target table specifically when you need to write data to 
external storage.

Like,

String relateQuery = "insert into xxx select correlator_id , name, relationship 
from Correlation; ;


Best,
Yu Chen

获取 Outlook for iOS

发件人: Zhanghao Chen 
发送时间: Wednesday, December 6, 2023 7:21:50 PM
收件人: elakiya udhayanan ; user@flink.apache.org 

主题: Re: Query on using two sinks for a Flink job (Flink SQL)

Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


[flink-k8s-connector] In-place scaling up often takes several times till it succeeds.

2023-12-06 Thread Xiaolong Wang
Hi,

I'm playing with a Flink 1.18 demo with the auto-scaler and the adaptive
scheduler.

The operator can correctly collect data and order the job to scale up, but
it'll take the job several times to reach the required parallelism.

E.g. The original parallelism for each vertex is something like below:

Vertex : write hourly_ads s3 || Parallelism : 100
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


Then the operator decides to scale the job to this:

Vertex : write hourly_ads s3 || Parallelism : 200
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


But when scaling, the vertex write hourly_ads s3 does scale directly from
100 to 200, but first scales to a number, say 120, then 150, then 180.
It'll take 3~4 times till the job to reach the required parallelism.

I'm wondering how to avoid this issue ?

Thanks in advanced.