RE: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-12 Thread Erez Yaakov
Thanks Zhanghao for your reply.

Do you know approximately what are the future plans regarding maturing this 
feature and having it available in application mode on native k8s?

Thanks,
Erez

From: zhanghao.c...@outlook.com 
Sent: Tuesday, September 13, 2022 4:30 AM
To: Erez Yaakov ; user@flink.apache.org
Subject: Re: Can flink dynamically detect high load and increase the job 
parallelism automatically?


EXTERNAL EMAIL

Hi Erez,

Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.

Best,
Zhanghao Chen

From: Erez Yaakov 
mailto:erez.yaa...@niceactimize.com>>
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage...) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread zhanghao.chen
@h.yuan...@gmail.com Any thoughts on this?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 11:24
To: zhanghao.chen ; user 
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

The yarn.classpath.include-user-jar parameter is shown as 
yarn.per-job-cluster.include-user-jar parameter in Flink 1.14.
I have tried DISABLED、FIRST、LAST、ORDER .But the error still happened.

Best,
Hjw



-- Original --
From: "zhanghao.chen" ;
Date: Tue, Sep 13, 2022 09:42 AM
To: "hjw"<1010445...@qq.com>;"user";
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

Did you set any additional classloading-related configs (esp. the 
yarn.classpath.include-user-jar parameter)?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 1:58
To: user 
Subject: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.

Error:
java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter

Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.

However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.

After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.

However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.


Env:
Flink version: flink 1.14.0


Best,
Hjw



Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-09-12 Thread Yun Tang
An interesting topic, I noticed that the datahub community has launched the 
feature request discussion of Flink Integration [1].

@Martijn Visser Did the Flink community had 
created tickets to track this topic?
>From my current understanding, Flink lacks rich information on 
>FlinkJobListener just as Feng mentioned, which has been supported well by 
>Spark, to send data lineage to external systems.



[1] https://feature-requests.datahubproject.io/p/flink-integration


Best
Yun Tang

From: wangqinghuan <1095193...@qq.com>
Sent: Monday, January 17, 2022 18:27
To: user@flink.apache.org 
Subject: Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration


we are using Datahub to address table-level lineage and column-level lineage 
for Flink SQL.

在 2022/1/13 23:27, Martijn Visser 写道:
Hi everyone,

I'm currently checking out different metadata platforms, such as Amundsen [1] 
and Datahub [2]. In short, these types of tools try to address problems related 
to topics such as data discovery, data lineage and an overall data catalogue.

I'm reaching out to the Dev and User mailing lists to get some feedback. It 
would really help if you could spend a couple of minutes to let me know if you 
already use either one of the two mentioned metadata platforms or another one, 
or are you evaluating such tools? If so, is that for the purpose as a 
catalogue, for lineage or anything else? Any type of feedback on these types of 
tools is appreciated.

Best regards,

Martijn

[1] https://github.com/amundsen-io/amundsen/
[2] https://github.com/linkedin/datahub



Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread hjw
Hi,


The yarn.classpath.include-user-jar parameter is shown as 
yarn.per-job-cluster.include-user-jar parameter in Flink 1.14. 
I have tried DISABLED??FIRST??LAST??ORDER .But the error still happened.

Best,
Hjw


 




-- Original --
From:   
 "zhanghao.chen"



Re: Insert into JDBC table

2022-09-12 Thread yuxia
"tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");" should work. If not work, it seems to be a bug.

>> "Flink dynamic table is just a link to real data"
Yes, it's.

>> Is there any way to create empty table? Or table with some values defined in 
>> Flink?
Maybe you can try create table with Hive dialect[1] which enable you create a 
table in Hive using Flink SQL.
Or you can try flink-table-store.
AFAK, seems we can't create a table with some values defined directly.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_dialect/
[2] https://nightlies.apache.org/flink/flink-table-store-docs-master/
. Or

Best regards,
Yuxia

- 原始邮件 -
发件人: "podunk" 
收件人: "User" 
发送时间: 星期一, 2022年 9 月 12日 下午 8:36:54
主题: Re: Insert into JDBC table

I see I can only insert into JDBC table with select from another table, 
something like:
 
tEnv.executeSql("INSERT INTO Customers SELECT customer_number, pid_no, name 
FROM another_table");

But what if I want to insert row that I created within Flink? For instance I 
made some calculation and I want to insert completely new row into table (it 
does not exist in any table)? Something like:

tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

?

One more question - Flink dynamic table is just a link to real data (right?). 
Is there any way to create empty table? Or table with some values defined in 
Flink?

Thanks for help,

M.

 
 

Sent: Friday, September 09, 2022 at 3:03 PM
From: pod...@gmx.com
To: user@flink.apache.org
Subject: Insert into JDBC table
Why this INSERT does not insert row in table (jdbc connection works, I can 
create 'Customers' table from MySQL table)?
 
tEnv.executeSql("CREATE TABLE Customers ("
+ " customer_number INT, "
+ " pid_no INT, "
+ " name STRING, "
+ " PRIMARY KEY (customer_number) NOT ENFORCED"
+ " ) WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:mysql://localhost:3306/test', "
+ " 'username' = 'some_user', "
+ " 'table-name' = 'customers', "
+ " 'password' = ''"
+ ")");

//This insert does nothing (not even error)
tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

According to documentation 
(https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
 it should work.
Regards,

Mike


Re: Access to Table environent properties/Job arguents from DynamicTableFactory

2022-09-12 Thread yuxia
Have you ever checked the DynamicTableFactory.Context#getConfiguration? 
Is it something that you're looking for? 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2022年 9 月 10日 上午 12:51:09 
主题: Access to Table environent properties/Job arguents from DynamicTableFactory 

Hi, 
is there a way to access a Table Environment configuration or Job arguments 
from DynamicTableFactory (Sink/Source)? 

I'm guessing no but I just want to double check that I'm not missing anything 
here. 
For my understanding we have access only to Table definition. 

Thanks, 
Krzysztof Chmielewski 



Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread zhanghao.chen
Hi,

Did you set any additional classloading-related configs (esp. the 
yarn.classpath.include-user-jar parameter)?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 1:58
To: user 
Subject: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.

Error:
java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter

Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.

However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.

After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.

However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.


Env:
Flink version: flink 1.14.0


Best,
Hjw



Re: How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Hangxiang Yu
Hi,
I think maybe you could try to create a Function that implements
WithMasterCheckpointHook.
These hooks will be called by the checkpoint coordinator when triggering /
restoring a checkpoint.
You could see more details from [1].
[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/WithMasterCheckpointHook.java

On Mon, Sep 12, 2022 at 11:20 PM Surendra Lalwani via user <
user@flink.apache.org> wrote:

> Tried these as well but they need to be called with some MapFunction or
> something else, we cannot just apply this Listener to the
> ExecutionEnvironment itself?
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> On Mon, Sep 12, 2022 at 8:42 PM Ben Edwards  wrote:
>
>> CheckpointedFunction / CheckpointListener.
>>
>> Best,
>> Ben
>>
>>
>> On Mon, Sep 12, 2022 at 4:01 PM Surendra Lalwani via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi Team,
>>>
>>> I want to perform some customized operations on checkpointing and
>>> savepointing. For example, I want to create an additional file every time
>>> checkpointing and savepointing gets triggered. Do we have anything as such?
>>> Any listeners or anything else?
>>>
>>> Thanks and Regards ,
>>> Surendra Lalwani
>>>
>>>
>>> --
>>> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
>>> confidential information and is intended only for the addressee(s) named
>>> above. If you are not the intended recipient(s), you should not
>>> disseminate, distribute, or copy this e-mail. Please notify the sender by
>>> reply e-mail immediately if you have received this e-mail in error and
>>> permanently delete all copies of the original message from your system.
>>> E-mail transmission cannot be guaranteed to be secure as it could be
>>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>>> contain viruses. Company accepts no liability for any damage or loss of
>>> confidential information caused by this email or due to any virus
>>> transmitted by this email or otherwise.
>>
>>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.



-- 
Best,
Hangxiang.


Re: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-12 Thread zhanghao.chen
Hi Erez,

Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.

Best,
Zhanghao Chen

From: Erez Yaakov 
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage…) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Classloading issues with Flink Operator / Kubernetes Native

2022-09-12 Thread Yaroslav Tkachenko
Hey everyone,

I’m migrating a Flink Kubernetes standalone job to the Flink operator (with
Kubernetes native mode).

I have a lot of classloading issues when trying to run with the operator in
native mode. For example, I have a Postgres driver as a dependency (I can
confirm the files are included in the uber jar), but I still get
"java.sql.SQLException: No suitable driver found for jdbc:postgresql:..."
exception.

In the Kubernetes standalone setup my uber jar is placed in the
/opt/flink/lib folder, this is what I specify as "jarURI" in the operator
config. Is this supported? Should I only be using /opt/flink/usrlib?

Thanks for any suggestions.


A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread hjw
When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.


Error:
java.lang.NoClassDefFoundError??org/apache/orc/PhysicalWriter


Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.


However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.


After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.


However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.




Env??
Flink version: flink 1.14.0



Best,
Hjw


 

?????? Where will the state be stored in the taskmanager when using rocksdbstatebend??

2022-09-12 Thread hjw
thanks for everyone. I will  increase the parallelism to solve the 
problem.Besides, I am  looking forward to support remote state.

Best,
Hjw


 




--  --
??: 
   "Yuan Mei"   
 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#state-backend-rocksdb-localdir
Make sure to use a local SSD disk (not NFS/EBS).


Best,
Alexander Fedulov


On Mon, Sep 5, 2022 at 7:24 PM hjw <1010445...@qq.com> wrote:

The EmbeddedRocksDBStateBackend holds in-flight data in 
a RocksDB database that is (per default) stored in the TaskManager 
local data directories.
Which path does local data directories store RocksDB database in 
TaskManager point to in operating system?
If the job state is very large, I think I should  take some measures to 
deal with it.(mount a volume for local data directories store RocksDB database 
etc...)


thx.



Best,
Hjw

Re: How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Surendra Lalwani via user
Tried these as well but they need to be called with some MapFunction or
something else, we cannot just apply this Listener to the
ExecutionEnvironment itself?

Thanks and Regards ,
Surendra Lalwani


On Mon, Sep 12, 2022 at 8:42 PM Ben Edwards  wrote:

> CheckpointedFunction / CheckpointListener.
>
> Best,
> Ben
>
>
> On Mon, Sep 12, 2022 at 4:01 PM Surendra Lalwani via user <
> user@flink.apache.org> wrote:
>
>> Hi Team,
>>
>> I want to perform some customized operations on checkpointing and
>> savepointing. For example, I want to create an additional file every time
>> checkpointing and savepointing gets triggered. Do we have anything as such?
>> Any listeners or anything else?
>>
>> Thanks and Regards ,
>> Surendra Lalwani
>>
>>
>> --
>> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
>> confidential information and is intended only for the addressee(s) named
>> above. If you are not the intended recipient(s), you should not
>> disseminate, distribute, or copy this e-mail. Please notify the sender by
>> reply e-mail immediately if you have received this e-mail in error and
>> permanently delete all copies of the original message from your system.
>> E-mail transmission cannot be guaranteed to be secure as it could be
>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>> contain viruses. Company accepts no liability for any damage or loss of
>> confidential information caused by this email or due to any virus
>> transmitted by this email or otherwise.
>
>

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Re: How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Ben Edwards
CheckpointedFunction / CheckpointListener.

Best,
Ben


On Mon, Sep 12, 2022 at 4:01 PM Surendra Lalwani via user <
user@flink.apache.org> wrote:

> Hi Team,
>
> I want to perform some customized operations on checkpointing and
> savepointing. For example, I want to create an additional file every time
> checkpointing and savepointing gets triggered. Do we have anything as such?
> Any listeners or anything else?
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.


How to perform something on Checkpointing and Savepointing

2022-09-12 Thread Surendra Lalwani via user
Hi Team,

I want to perform some customized operations on checkpointing and
savepointing. For example, I want to create an additional file every time
checkpointing and savepointing gets triggered. Do we have anything as such?
Any listeners or anything else?

Thanks and Regards ,
Surendra Lalwani

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-12 Thread Erez Yaakov
Hi,

When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage...) and increase the job parallelism automatically?

I am running flink streaming job on an application mode cluster using native 
k8s.
My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.

I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.
Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.
Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?

For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.

Thanks,
Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Insert into JDBC table

2022-09-12 Thread podunk
I see I can only insert into JDBC table with select from another table, 
something like:
 
tEnv.executeSql("INSERT INTO Customers SELECT customer_number, pid_no, name 
FROM another_table");

But what if I want to insert row that I created within Flink? For instance I 
made some calculation and I want to insert completely new row into table (it 
does not exist in any table)? Something like:

tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

?

One more question - Flink dynamic table is just a link to real data (right?). 
Is there any way to create empty table? Or table with some values defined in 
Flink?

Thanks for help,

M.

 
 

Sent: Friday, September 09, 2022 at 3:03 PM
From: pod...@gmx.com
To: user@flink.apache.org
Subject: Insert into JDBC table
Why this INSERT does not insert row in table (jdbc connection works, I can 
create 'Customers' table from MySQL table)?
 
tEnv.executeSql("CREATE TABLE Customers ("
+ " customer_number INT, "
+ " pid_no INT, "
+ " name STRING, "
+ " PRIMARY KEY (customer_number) NOT ENFORCED"
+ " ) WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:mysql://localhost:3306/test', "
+ " 'username' = 'some_user', "
+ " 'table-name' = 'customers', "
+ " 'password' = ''"
+ ")");

//This insert does nothing (not even error)
tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

According to documentation 
(https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
 it should work.
Regards,

Mike
 
 


Re: Fail to build Flink 1.15.1

2022-09-12 Thread Chesnay Schepler
I think you want to use --single-branch instead of --depth 1; the latter 
only overrides top-level files, leaving the remaining files on whatever 
version you checked out previously.


https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---depthltdepthgt

On 10/09/2022 16:08, Jun Qin wrote:

|Thanks Chesnay, |
|I wanted to check out the tagged release 1.15.1. I did it in this way:|
|git clone --depth 1 --branch release-1.15.1 
g...@github.com:apache/flink.git|

||
|This seems cause the problem. With the same java/maven, I can build 
the branch 1.15. |




On Sep 9, 2022, at 11:58 PM, Chesnay Schepler  wrote:

hmm...we've only seen that error in older Flink version: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/ide_setup/#compilation-fails-with-cannot-find-symbol-symbol-method-defineclass-location-class-sunmiscunsafe


Please double-check whether you actually checked out 1.15.1; I can't 
reference to sun.misc.Unsafe in the 1.15.1 version of the mentioned 
class: 
https://github.com/apache/flink/blob/release-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java


On 09/09/2022 22:01, Jun Qin wrote:

I have an issue when build a clean Flink 1.15.1 on MacOS with:
mvn clean install -DskipTests -Dfast

% echo $JAVA_HOME
/usr/local/Cellar/openjdk@11/11.0.16.1

% mvn -version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /usr/local/Cellar/maven/3.8.6/libexec
Java version: 11.0.16.1, vendor: Homebrew, runtime: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: “mac"

% java -version
openjdk version "11.0.16.1" 2022-08-12
OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1+0, mixed mode)

It failed with:
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile 
(default-compile) on project flink-test-utils-junit: Compilation failure
[ERROR] 
/Workspace/flink-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[244,53] 
cannot find symbol
[ERROR] symbol:   method 
defineClass(java.lang.String,byte[],int,int,java.lang.ClassLoader,java.security.ProtectionDomain)

[ERROR] location: class sun.misc.Unsafe

I tried also with a downloaded maven 3.2.5 binary (maven 3.2 has 
been disabled in brew):

% ~/Downloads/apache-maven-3.2.5/bin/mvn -version
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 
2014-12-14T18:29:23+01:00)

Maven home: /Users/jqin/Downloads/apache-maven-3.2.5
Java version: 11.0.16.1, vendor: Homebrew
Java home: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home

Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: "mac"

it failed at the same place with the same error message.

Anything I did is wrong?

Jun