Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便

Best,
Shammon FY

On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun 
wrote:

> Hi,
>
> 可以尝试使用 flink-cdc-connectors 去实时关联。
> 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
> 被关联的表变化不大的话可以考虑 lookup join。
>
> Best,
> Jiabao
>
>
> > 2023年8月8日 上午11:10,小昌同学  写道:
> >
> > 谢谢老师指导呀;
> >
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> > 老师这一块有更好的建议嘛
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Shammon FY |
> > | 发送日期 | 2023年8月8日 10:37 |
> > | 收件人 |  |
> > | 主题 | Re: Flink消费MySQL |
> > Hi,
> >
> > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> >
> > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> >
> > Best,
> > Shammon FY
> >
> > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> >
> > 各位老师好
> >
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> > 以下是我的代码:
> > |
> > public class MysqlSource2 extends RichSourceFunction {
> > PreparedStatement ps;
> > private Connection connection;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > connection = getConnection();
> > String sql="select * from actiontype;";
> > ps = connection.prepareStatement(sql);
> > }
> >
> > private static Connection getConnection(){
> > Connection con=null;
> > String driverClass= FlinkConfig.config.getProperty("driverClass");
> > String url=FlinkConfig.config.getProperty("jdbcUrl");
> > String user=FlinkConfig.config.getProperty("jdbcUser");
> > String passWord=FlinkConfig.config.getProperty("passWord");
> >
> > try {
> > Class.forName(driverClass);
> > con= DriverManager.getConnection(url,user,passWord);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > return con;
> > }
> >
> > @Override
> > public void run(SourceContext ctx) throws Exception {
> > ResultSet resultSet = ps.executeQuery();
> > while (resultSet.next()){
> > ActionType actionType = new ActionType(
> > resultSet.getString("action"),
> > resultSet.getString("action_name")
> > );
> > ctx.collect(actionType);
> > }
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > if (null!=connection){
> > connection.close();
> > }
> > if (null!=ps){
> > ps.close();
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > }
> > };
> >
> >
> > |
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>
>


Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Xiaolong Wang
Ok, thank you.

On Tue, Aug 8, 2023 at 11:22 AM Peter Huang 
wrote:

> We will handle it asap. Please check the status of this jira
> https://issues.apache.org/jira/browse/FLINK-32777
>
> On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
>> the below issues:
>>
>> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>>> fd70:e66a:970d::1 not verified:*
>>>
>>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>>
>>> *DN: CN=kube-apiserver*
>>>
>>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>>> ,
>>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>>> kubernetes.default, kubernetes.default.svc,
>>> kubernetes.default.svc.cluster.local]*
>>>
>>
>> Which seemed to be related to a known issue
>>  of okhttp.
>>
>> I'm wondering if there is a plan to support IPv6 for
>> flink-kubernetes-operator in the near future ?
>>
>


Re: Flink消费MySQL

2023-08-07 Thread Jiabao Sun
Hi,

可以尝试使用 flink-cdc-connectors 去实时关联。
使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
被关联的表变化不大的话可以考虑 lookup join。

Best,
Jiabao


> 2023年8月8日 上午11:10,小昌同学  写道:
> 
> 谢谢老师指导呀;
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> 老师这一块有更好的建议嘛
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年8月8日 10:37 |
> | 收件人 |  |
> | 主题 | Re: Flink消费MySQL |
> Hi,
> 
> 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> 
> 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> 
> Best,
> Shammon FY
> 
> On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> 
> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
> 
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
> 
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
> 
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
> 
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
> 
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
> 
> @Override
> public void cancel() {
> }
> };
> 
> 
> |
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Peter Huang
We will handle it asap. Please check the status of this jira
https://issues.apache.org/jira/browse/FLINK-32777

On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
wrote:

> Hi,
>
> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
> the below issues:
>
> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>> fd70:e66a:970d::1 not verified:*
>>
>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>
>> *DN: CN=kube-apiserver*
>>
>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>> ,
>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>> kubernetes.default, kubernetes.default.svc,
>> kubernetes.default.svc.cluster.local]*
>>
>
> Which seemed to be related to a known issue
>  of okhttp.
>
> I'm wondering if there is a plan to support IPv6 for
> flink-kubernetes-operator in the near future ?
>


回复: Flink消费MySQL

2023-08-07 Thread 小昌同学
谢谢老师指导呀;
我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
老师这一块有更好的建议嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年8月8日 10:37 |
| 收件人 |  |
| 主题 | Re: Flink消费MySQL |
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Flink K8S operator does not support IPv6

2023-08-07 Thread Xiaolong Wang
Hi,

I was testing flink-kubernetes-operator in an IPv6 cluster and found out
the below issues:

*Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
> fd70:e66a:970d::1 not verified:*
>
> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>
> *DN: CN=kube-apiserver*
>
> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
> ,
> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
> kubernetes.default, kubernetes.default.svc,
> kubernetes.default.svc.cluster.local]*
>

Which seemed to be related to a known issue
 of okhttp.

I'm wondering if there is a plan to support IPv6 for
flink-kubernetes-operator in the near future ?


Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
>
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
>
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
>
> @Override
> public void cancel() {
> }
> };
>
>
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re:Re: flink作业如何从yarn平滑迁移到k8s?

2023-08-07 Thread casel.chen






如果不是用的flink kubernetes operator或者hdfs和oss系统网络无法直接连通怎么办?
有没有办法读取hdfs的checkpoint/savepoint然后再另存为oss的checkpoint/savepoint呢?谢谢!











在 2023-08-07 10:33:25,"Ruibin Xing"  写道:
>你好,
>
>如果你们也使用的是官方的Flink Kubernetes
>Operator,可以参考我们迁移的经验:迁移的时候设置FlinkDeployment的initalSavepoint为HDFS上Savepoint的地址,同时配置savepoint/checkpoint目录为OSS。这样Flink启动的时候会从HDFS中的状态恢复,并将新的checkpoint保存在oss中。
>
>On Sun, Aug 6, 2023 at 10:03 PM casel.chen  wrote:
>
>> flink on yarn作业checkpoint/savepoint保存在hdfs上面,现在想将其迁移到on
>> k8s上运行,使用的是对象存储oss,请问如何无感地进行作业状态迁移呢?使用的flink版本是1.15.2,谢谢!


Easiest way to do a batch outer join

2023-08-07 Thread Flavio Pompermaier
Hello everybody,
I have a use case where I need to exclude from a DataStream (that is
technically a DataSet since I work in batch mode) all already-indexed
documents.
My idea is to perfrom an outer join but I didn't find any simple example on
DataStream working on batch mode..I've tried using coGroup() but then it
requires me to specify a windows strategy..in batch mode I would't expect
that..can I use global window?

Thanks in advance,
Flavio


Re: flink1.14.5 sql-client无法查询hbase1.4.3数据

2023-08-07 Thread Shammon FY
Hi,

看着像是版本冲突了,你有在你的flink session集群目录里放hbase的包吗?可以检查一下跟flink hbase
shaded的hbase版本是否一致

Best,
Shammon FY

On Sat, Aug 5, 2023 at 9:33 PM 杨东树  wrote:

> 各位好,
>目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。
>复现方法:
> 1、hbase操作:
> hbase(main):005:0> create 'flink_to_hbase','cf1'
> 0 row(s) in 2.2900 seconds
> hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username',
> 'zhangsan'
> 0 row(s) in 0.0510 seconds
>
>
> 2、flink操作:
> ./start-cluster.sh
> ./sql-client.sh
> CREATE TABLE flink_to_hbase(
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )WITH(
> 'connector'='hbase-1.4',
> 'table-name'='flink_to_hbase',
> 'zookeeper.quorum'='192.168.21.128:2181',
> 'zookeeper.znode.parent'='/hbase'
> );
>
>
> 3、flink 报错日志:
> 2023-08-05 21:00:35,081 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-08-05 21:00:52,011 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:52,026 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (0c147bc0da5a43a5a382f2ec20740b45).
> 2023-08-05 21:00:52,480 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (0c147bc0da5a43a5a382f2ec20740b45) to '
> http://localhost:8081'.
> 2023-08-05 21:00:55,809 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:55,830 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,481 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,484 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (d29904103fa3c83e3089c09f093372c9).
> 2023-08-05 21:07:52,728 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (d29904103fa3c83e3089c09f093372c9) to '
> http://localhost:8081'.
> 2023-08-05 21:07:55,972 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:231)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:532)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:423)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:209)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> 

Can issues with Kafka consumption influence the watermark?

2023-08-07 Thread Piotr Domagalski
Hi!

We have a rather simple Flink job which has the main purpose is
concatenating events read from Kafka and outputting them in session windows
with a gap of 1 minute, 1 minute out of order and 1h idleness setting:
[image: Screenshot 2023-08-07 at 13.44.09.png]
The problem we are facing is that we sometimes see it starts to struggle
and it seems it all starts with the following message appearing in the
source operator:

[Consumer clientId=sessionizer, groupId=sessionizer] Error sending fetch
request (sessionId=1914084170, epoch=8) to node 2: [Consumer
clientId=sessionizer, groupId=sessionizer] Cancelled in-flight FETCH
request with correlation id 105760966 due to node 2 being disconnected
(elapsed time since creation: 31311ms, elapsed time since send: 31311ms,
request timeout: 3ms) [Consumer clientId=sessionizer,
groupId=sessionizer] Disconnecting from node 2 due to request timeout.

What happens right after this is mind-puzzling. In theory, we understand
that issues consuming from some partitions (as the error above seems to
suggest), should not influence how watermarks are handled - ie. the
watermark will not move forward, thus this should not cause any sessions to
be closed prematurely.

However, what we see is *the opposite* - shortly after (a matter of 1/2
minutes) the error we see a spike in window sessions being closed, just as
if Flink moved forward with the time (as tracked by watermark) but because
of some events missing (not consumed), it decided to close a lot of the
windows as the sessions were not getting any events. But how is this
possible, if at all? Can anyone think of any (even remotely) possible
explanation?

When this happens, we then see an expected turn of following events - a lot
of data gets produced to the sink, we run into Kafka producer quota limits
we have defined, this then puts the backpressure and we see a lag. The
Kafka errors disappear after 15-20 minutes and the situation goes back to
normal. However, some corrupted data gets produced whenever this happens.

This is Flink 1.15.2 running in AWS as Kinesis Data Analytics. There are
144 partitions on the input, parallelism 72, we use Kafka msg event
timestamps (as set by the producer). We've seen it before with 72
partitions and parallelism of 72.

-- 
Piotr Domagalski


Flink消费MySQL

2023-08-07 Thread 小昌同学
各位老师好 
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|