回复: Flink SQL的状态清理

2023-10-09 Thread
你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年9月25日 11:24 |
| 收件人 |  |
| 主题 | Re: Flink SQL的状态清理 |
Hi,

可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86

Best,
Jane

On Thu, Sep 21, 2023 at 5:17 PM faronzz  wrote:

试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s")




| |
faronzz
|
|
faro...@163.com
|


 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年09月21日 17:06 |
| 收件人 | user-zh |
| 主题 | Flink SQL的状态清理 |


各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
| |
小昌同学
|
|
ccc0606fight...@163.com
|


Flink SQL的状态清理

2023-09-21 Thread


各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: Flink 窗口触发条件

2023-08-09 Thread
好滴呀 谢谢各位老师指导




| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Yanfei Lei |
| 发送日期 | 2023年8月10日 11:50 |
| 收件人 |  |
| 主题 | Re: Flink 窗口触发条件 |
hi,
感觉和[1]的问题比较像,事件时间的window在onElement和onEventTime时会触发,这两个方法又会根据watermark判断,可以看看o.a.f.table.runtime.operators.window.triggers包和o.a.f.table.runtime.operators.wmassigners包。

[1] https://juejin.cn/post/6850418110010179597

小昌同学  于2023年8月10日周四 10:52写道:

各位老师好,我这边在使用Flink的事件时间窗口的时候,关于窗口触发的条件我有一点疑问想咨询一下各位老师
我是开了一个2分钟的事件时间的窗口,但是等到两分钟后窗口并没有主动触发,等我后面再发一条数据的时候,窗口再进行了触发
所以我想请问一下窗口的触发机制不是时间点嘛,而是非要等到下一条数据发送,依赖于下一条数据携带的时间戳大于窗口的结束时间,上一个窗口才会真正的触发嘛
请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|



--
Best,
Yanfei


Flink 窗口触发条件

2023-08-09 Thread
各位老师好,我这边在使用Flink的事件时间窗口的时候,关于窗口触发的条件我有一点疑问想咨询一下各位老师
我是开了一个2分钟的事件时间的窗口,但是等到两分钟后窗口并没有主动触发,等我后面再发一条数据的时候,窗口再进行了触发
所以我想请问一下窗口的触发机制不是时间点嘛,而是非要等到下一条数据发送,依赖于下一条数据携带的时间戳大于窗口的结束时间,上一个窗口才会真正的触发嘛
请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 回复: Flink消费MySQL

2023-08-09 Thread
好的 谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | ron |
| 发送日期 | 2023年8月10日 00:51 |
| 收件人 |  |
| 主题 | Re: 回复: Flink消费MySQL |

Hi,

建议通过CDC实时读,然后用Flink的双流Join进行关联。

-原始邮件-
发件人: "小昌同学" 
发送时间: 2023-08-08 11:10:19 (星期二)
收件人: user-zh 
抄送: user-zh 
主题: 回复: Flink消费MySQL

谢谢老师指导呀;
我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
老师这一块有更好的建议嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年8月8日 10:37 |
| 收件人 |  |
| 主题 | Re: Flink消费MySQL |
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


--
Best,
Ron


回复: Flink消费MySQL

2023-08-07 Thread
谢谢老师指导呀;
我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
老师这一块有更好的建议嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年8月8日 10:37 |
| 收件人 |  |
| 主题 | Re: Flink消费MySQL |
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


Flink消费MySQL

2023-08-07 Thread
各位老师好 
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction {
PreparedStatement ps;
private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}

private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");

try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}

@Override
public void run(SourceContext ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}

@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
    }
}

@Override
public void cancel() {
}
};


|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: JdbcSink引发的IO过高

2023-07-25 Thread
你好,感谢老师回复
`insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的
| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年7月26日 10:52 |
| 收件人 |  |
| 主题 | Re: JdbcSink引发的IO过高 |
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学  wrote:

各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:

我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
(statement, result) -> {
statement.setString(1,result.getAction());
statement.setString(2,result.getServerIp());
statement.setString(3,result.getHandleSerialno());
statement.setString(4,result.getMd5Num());
statement.setString(5,result.getInsertTime());
statement.setString(6, result.getDateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("222")
.build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|


JdbcSink引发的IO过高

2023-07-25 Thread
各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);
或者是
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO



|
errorStream.addSink(JdbcSink.sink(
"insert  into error_md5_info (action, serverIp, 
handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
(statement, result) -> {
statement.setString(1,result.getAction());
statement.setString(2,result.getServerIp());
statement.setString(3,result.getHandleSerialno());
statement.setString(4,result.getMd5Num());
statement.setString(5,result.getInsertTime());
statement.setString(6, result.getDateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/data_ret_log?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("222")
        .build()
)).name("sink-error-mysql");
|
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: flink jdbcsink 连接数的问题

2023-05-31 Thread
你好,老师,感谢你的回复;
我在MySQL中设置的主键不是自增主键,是一个业务主键的,所以我理解是不是这一块只能是设置并发度为1进行数据的插入


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年5月31日 12:30 |
| 收件人 |  |
| 主题 | Re: flink jdbcsink 连接数的问题 |
你好,这个问题和flink无关,看你主键实现机制吧,如果是自增,那就是mysql级别自动实现的自增,跟flink搭不上关系的。

小昌同学  于2023年5月31日周三 09:41写道:

老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突;
感谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月30日 14:30 |
| 收件人 |  |
| 主题 | Re:flink jdbcsink 连接数的问题 |
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


回复: flink 输出异常数据

2023-05-31 Thread
你好,老师,感谢你的回复;
您说的打印到日志文件,是需要配置flink 的logback.xml 嘛,这一块的配置有一个小demo可以参考嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年6月1日 10:33 |
| 收件人 |  |
| 主题 | Re: flink 输出异常数据 |
Hi

可以看一下报空指针的具体异常栈,如果是你的业务代码,可以在你的处理逻辑里加上一些判断信息并打印到日志文件;如果不是你的业务代码,可以贴一下具体的异常栈信息。

On Wed, May 31, 2023 at 12:31 PM yidan zhao  wrote:

这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。

小昌同学  于2023年5月29日周一 18:30写道:

你好,数据源是kafka,使用的是stream api


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年5月29日 15:29 |
| 收件人 |  |
| 主题 | Re: flink 输出异常数据 |
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:



各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|



回复: flink web ui显示问题

2023-05-30 Thread
你好,老师,感谢回复,我这边将截图放在了腾讯文档中,请查收;
感谢各位老师的指导
【腾讯文档】flink web ui
https://docs.qq.com/sheet/DYkZ0Q0prRWJxcER4?tab=BB08J2


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月31日 09:59 |
| 收件人 |  |
| 主题 | Re: flink web ui显示问题 |
Hi,

好像没有收到附件或者文档,你可以检查确认一下

Best,
Shammon FY

On Wed, May 31, 2023 at 9:52 AM 小昌同学  wrote:

各位老师好,请教一个关于flink web ui的显示问题;
具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web
ui上面已经显示watermark,但是看detail的时候显示不是watermark;
感谢各位老师指导

小昌同学
ccc0606fight...@163.com

<https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>



flink web ui显示问题

2023-05-30 Thread
各位老师好,请教一个关于flink web ui的显示问题;
具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web ui上面已经显示watermark,但是看detail的时候显示不是watermark;
感谢各位老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:flink jdbcsink 连接数的问题

2023-05-30 Thread
老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突;
感谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月30日 14:30 |
| 收件人 |  |
| 主题 | Re:flink jdbcsink 连接数的问题 |
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


回复:flink jdbcsink 连接数的问题

2023-05-30 Thread
好滴呀 谢谢老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月30日 14:30 |
| 收件人 |  |
| 主题 | Re:flink jdbcsink 连接数的问题 |
hi,
jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。
关于连接数,则是跟你的并行度有关。

















在 2023-05-30 13:55:57,"小昌同学"  写道:
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|


flink jdbcsink 连接数的问题

2023-05-29 Thread
各位老师,请教一下关于flink jdbcsink 连接数的问题;
我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀;
谢谢各位老师的指导

|
outPutInfoStream.addSink(JdbcSink.sink(
"REPLACE  into InputInfo (breakCode, breakName, breakDuration, 
breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs)
 values (?,?,?,?,?,?,?,?,?,?)",
(statement, InPutInfo) -> {
statement.setString(1,InPutInfo.getBreakCode());
statement.setString(2,InPutInfo.getBreakName());
statement.setLong(3,InPutInfo.getBreakDuration());
statement.setString(4,InPutInfo.getBreakRule());
statement.setString(5,InPutInfo.getBreakPrimaryKey());
statement.setString(6, InPutInfo.getBreakStep());
statement.setString(7, InPutInfo.getBreakStepType());
statement.setString(8,InPutInfo.getBreakTime());
statement.setString(9, DateUtil.format(new Date()));
statement.setString(10, 
String.valueOf(InPutInfo.getBreakArgs()));
},
JdbcExecutionOptions.builder()
.withBatchSize(10)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("111")
.withPassword("111")
.build()
)).name("sink-mysql");
|


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: flink 输出异常数据

2023-05-29 Thread
你好,数据源是kafka,使用的是stream api


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Weihua Hu |
| 发送日期 | 2023年5月29日 15:29 |
| 收件人 |  |
| 主题 | Re: flink 输出异常数据 |
Hi,

你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?

方便把异常栈贴一下吗

Best,
Weihua


On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:


各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|


flink 输出异常数据

2023-05-28 Thread
各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: flink 窗口触发计算的条件

2023-05-25 Thread
请教一下老师,您说的【同样数据的话,水印没有推进,窗口就不会触发】是不是意思是发送相同的数据,数据本身携带的时间戳是一样的,达不到水位线触发窗口的标准呀?
还有两个问题想请教一下各位老师:
1、事件时间窗口的闭合是取决于下一条数据所携带的时间戳嘛,只有当下一条数据携带的时间戳大于上一个窗口的endTime,窗口才会触发,如果是这个样子的话,那如果一个最后一个窗口怎么触发啊
2、我想使用stream api去打印出来窗口的起始时间以及结束时间,这个是哪一个api呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月25日 10:14 |
| 收件人 |  |
| 主题 | Re:回复: flink 窗口触发计算的条件 |
你好,可以先看看官方文档中关于事件时间和水印的介绍
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/
如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发



















在 2023-05-25 10:00:36,"小昌同学"  写道:
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 |  |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的

回复: flink 窗口触发计算的条件

2023-05-24 Thread
是的 我发送了很多数据,发现窗口还是没有触发


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年5月25日 09:59 |
| 收件人 |  |
| 主题 | Re: flink 窗口触发计算的条件 |
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。

小昌同学  于2023年5月25日周四 09:32写道:

各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
System.currentTimeMillis());
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
 recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator completeInfoStream = 
baseInfoStream.map(new MapFunction() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿

flink 窗口触发计算的条件

2023-05-24 Thread
各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
相关代码以及样例数据如下:
|
package job;
import bean.MidInfo3;
import bean.Result;
import bean2.BaseInfo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import utils.DateUtil;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly9 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
//1、消费Kafka中的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new 
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
sourceStream.print("最源端的数据sourceStream");

//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据
SingleOutputStreamOperator baseInfoStream = sourceStream.map(new 
MapFunction() {
@Override
public BaseInfo2 map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间
String time = splits[0].substring(7, 19).replace("-", "").trim();
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, 
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, 
System.currentTimeMillis());
}

}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element,
 recordTimestamp) -> element.getEvenTime()));
baseInfoStream.print("不加功能描述的 baseInfoStream");

//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述
SingleOutputStreamOperator completeInfoStream = 
baseInfoStream.map(new MapFunction() {
@Override
public BaseInfo2 map(BaseInfo2 value) throws Exception {
//拿到数据中携带的数字的action
String actionId = value.getFuncId();
System.out.println("数据中的action编码是: " + actionId);
String actionName = null;

table api定义rowtime未生效

2023-05-16 Thread
各位老师好,以下是我的代码:

| Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), 
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), 
$("eventTime").rowtime());
tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = 
tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as 
minTime\n" +"FROM TABLE(CUMULATE(\n" +" TABLE 
midTable1"+//" TABLE "+ midTable +" , 
DESCRIPTOR(eventTime)\n" +" , INTERVAL '60' SECOND\n" + 
   " , INTERVAL '1' DAY))\n" +" GROUP BY 
window_start,window_end,funcId,funcIdDesc,serverIp,pk"); |
我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime 
timestamp is not defined. Please make sure that a proper TimestampAssigner is 
defined and the stream environment uses the EventTime time characteristic
想请教一下各位老师解决之法
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 回复:报错显示为bug

2023-05-16 Thread
好滴呀  谢谢各位老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年5月16日 08:46 |
| 收件人 |  ,
 |
| 主题 | Re: 回复:报错显示为bug |
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换

















At 2023-05-15 18:29:15, "小昌同学"  wrote:
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//使用侧输出流
OutputTag requestStream = new
OutputTag("requestStream") {
};
OutputTag answerStream = new
OutputTag("answerStream") {
};

//1、连接测试环境kafka的数据
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName =
FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource sourceStream = env.addSource(new
FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}

//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator baseInfoStream =
sourceStream.map(new MapFunction() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");

String[] splits = datas.split("\n");
HashMap dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});

//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator tagStream =
baseInfoStream.process(new MyProcessFunction(reques

回复:报错显示为bug

2023-05-15 Thread
er.java:4396)
 at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
 at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
 at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
 at 
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
 at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
 at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
 at 
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
 at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
 at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
 at 
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
 at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
 at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
 at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
 at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
 at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
 at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
 at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
 at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
 at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
 at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
 at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
 at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
 at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
 at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
 ... 22 more

Process finished with exit code 1

|


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | lxk |
| 发送日期 | 2023年5月15日 18:21 |
| 收件人 |  |
| 主题 | Re:报错显示为bug |
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。

















在 2023-05-15 17:11:42,"小昌同学"  写道:
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作


| |
小昌同学
|
|
ccc0606fight...@163.com
|


报错显示为bug

2023-05-15 Thread
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink 状态设置

2023-05-11 Thread
各位老师好,我这边使用的flink sql是" 
select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as varchar(200)) as 
minTime,pk from
(
 select
  a.funcId as funcId ,
  a.funcIdDesc as funcIdDesc,
  a.serverIp as serverIp,
  b.outTime as maxTime,
  a.outTime as minTime,
  concat(a.funcId,a.serverIp) as pk
 from tableRequest a
 inner join tableAnswer b
 on a.handleSerialNo=b.handleSerialNo
)
group by funcId,funcIdDesc,serverIp,pk‍‍‍"
考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 不同的流程使用不同的并行度

2023-05-10 Thread
好滴呀  谢谢各位老师指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2023年4月21日 10:50 |
| 收件人 |  |
| 主题 | Re: 不同的流程使用不同的并行度 |
从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。

Shammon FY  于2023年4月21日周五 09:04写道:

Hi

DataStream作业设置并发度有两种方式
1. 在ExecutionEnvironment通过setParallelism设置全局并发
2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度

Best,
Shammon FY

On Fri, Apr 21, 2023 at 8:58 AM 小昌同学  wrote:



各位老师好,请教一下关于flink的并行度的问题;
我现在数据上游是kafka(四个分区),经过Flink
ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
| |
小昌同学
|
|
ccc0606fight...@163.com
|


不同的流程使用不同的并行度

2023-04-20 Thread


各位老师好,请教一下关于flink的并行度的问题;
我现在数据上游是kafka(四个分区),经过Flink 
ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API
还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:控制台打印出流式数据

2023-04-19 Thread
好滴呀  谢谢老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jason_H |
| 发送日期 | 2023年4月19日 16:06 |
| 收件人 | flink中文邮件组 |
| 主题 | 回复:控制台打印出流式数据 |
这个方法就可以打印在你本地的idea控制台里面,你试一下


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 16:01 |
| 收件人 | user-zh |
| 抄送人 | user-zh |
| 主题 | 回复:控制台打印出流式数据 |
这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jason_H |
| 发送日期 | 2023年4月19日 15:58 |
| 收件人 | flink中文邮件组 |
| 主题 | 回复:控制台打印出流式数据 |
hi,你好
你应该使用 stream.print() 来打印流中的数据 不要system out 输出


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 15:51 |
| 收件人 | user-zh |
| 主题 | 控制台打印出流式数据 |


各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new  
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接  
System.out.println(stream.toString);
但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:flink命令行提交作业读取不到properties配置文件

2023-04-19 Thread
我这边的做法是将配置文件也当作一条流进行读取,程序会自动读取,不需要再任务启动的时候指定;希望对你有帮助呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jason_H |
| 发送日期 | 2023年4月19日 15:57 |
| 收件人 | flink中文邮件组 ,
user-zh-subscribe |
| 主题 | flink命令行提交作业读取不到properties配置文件 |
hi,大家好
我在使用命令行提交任务时,发现任务刚起来就会报错,根据错误发现没有读去到jar包中resource目录下的properties配置文件,导致在使用redis时,初始化报错
提交命令如下:
flink run -c com.test..etl.OdsChangeApplication 
/opt/dobrain/app/etl/test-etl-0.0.2-SNAPSHOT.jar \
-p 4 \
-job-name test-etl \


此处没有添加redis配置参数,但是配置文件中已经有默认的,提交运行后报错:
java.lang.IllegalArgumentException: template not initialized; call 
afterPropertiesSet() before using it
at org.springframework.util.Assert.isTrue(Assert.java:121) 
~[spring-core-5.2.14.RELEASE.jar:5.2.14.RELEASE]
at 
org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:204)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
at 
org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
at 
org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
at 
org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53)
 ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE]
at com.test.etl.client.RedisService.getStringValue(RedisService.java:30) 
~[classes/:?]
at 
com.test.etl.manager.impl.RedisChangeManager.getCustId(RedisChangeManager.java:53)
 ~[classes/:?]
at 
com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:46)
 ~[classes/:?]
at 
com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:21)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-streaming-java-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-runtime-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-runtime-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-runtime-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-runtime-1.15.2.jar:1.15.2]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]


我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误


请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢


| |
Jason_H
|
|
hyb_he...@163.com
|

回复:控制台打印出流式数据

2023-04-19 Thread
这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jason_H |
| 发送日期 | 2023年4月19日 15:58 |
| 收件人 | flink中文邮件组 |
| 主题 | 回复:控制台打印出流式数据 |
hi,你好
你应该使用 stream.print() 来打印流中的数据 不要system out 输出


| |
Jason_H
|
|
hyb_he...@163.com
|
 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年4月19日 15:51 |
| 收件人 | user-zh |
| 主题 | 控制台打印出流式数据 |


各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new  
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接  
System.out.println(stream.toString);
但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

控制台打印出流式数据

2023-04-19 Thread


各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new  
FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接  
System.out.println(stream.toString);
但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下




| |
小昌同学
|
|
ccc0606fight...@163.com
|

流数据转化为json

2023-04-14 Thread
你好,请问一下上游的数据是
SingleOutputStreamOperator outPutInfoStream = 
keyedStream.process(new KeyStreamFunc());
数据样式为:InPutInfo[phone='123456',workId='001']
我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:flink sink web ui显示为Sink: Unnamed

2023-04-14 Thread
好滴,谢谢各位老师


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | hjw |
| 发送日期 | 2023年4月14日 16:38 |
| 收件人 |  |
| 主题 | Re:flink sink web ui显示为Sink: Unnamed |
可以在算子后面调用.name()方法指定名称,方法参数就是算子名称。
比如需sink的流为stream
stream.sinkTo(Sink算子).name("sink-name")




--

Best,
Hjw





在 2023-04-14 16:26:35,"小昌同学"  写道:
我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed ,这个针对sink节点可以命名嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|


flink sink web ui显示为Sink: Unnamed

2023-04-14 Thread
我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed ,这个针对sink节点可以命名嘛


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复:打印不同流怎么进行区分

2023-04-14 Thread
好滴,谢谢您


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | 17610775726<17610775...@163.com> |
| 发送日期 | 2023年4月14日 10:27 |
| 收件人 | user-zh@flink.apache.org |
| 抄送人 | user-zh |
| 主题 | 回复:打印不同流怎么进行区分 |
Hi



Print 方法是可以传入一个参数的,用来标识某个流,比如 print(“a”); print(“b");


Best
JasonLee


 回复的原邮件 ----
| 发件人 | 小昌同学 |
| 发送日期 | 2023年04月14日 09:46 |
| 收件人 | user-zh |
| 主题 | 打印不同流怎么进行区分 |
你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉


| |
小昌同学
|
|
ccc0606fight...@163.com
|

打印不同流怎么进行区分

2023-04-13 Thread
你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink sql upsert mysql问题

2023-03-27 Thread
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true' -- 解析失败跳过
);
create view v_dm_cust_oact_prog_ri as
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time
from
(
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY 
cust_curr_step_occu_tm DESC) AS rn
from
(
select
t2.client_id as cust_id
,t2.client_name as cust_nme
,t2.mobile_tel as cust_mob_tel
,case when t2.active_datetime is not null then '开户成功'
when t5.business_flag_audit in ('1003','1011') then '人工审核'
when t1.business_flag = '22114' then '提交申请'
when t1.business_flag in ('22115','33500') or 
t5.business_flag_video in ('1200','1202','1203') then '视屏验证'
when t1.business_flag = '22113' then '绑定三方存管'
when t1.business_flag = '22112' then '设置密码'
when t1.business_flag = '22109' then '协议签署'
when t1.business_flag = '22108' then '开通账户选择'
when t1.business_flag = '22110' then '风险评测'
when t1.business_flag = '22106' then '填写基本资料'
when t1.business_flag = '22107' then '上传身份证'
when t1.business_flag = '22111' then '选择营业部'
when t1.business_flag = '12100' then '新开户:注册申请开户' end as 
cust_curr_step
,case when t2.active_datetime is not null then 13
when t5.business_flag_audit in ('1003','1011') then 12
when t1.business_flag = '22114' then 11
when t1.business_flag in ('22115','33500') or 
t5.business_flag_video in ('1200','1202','1203') then 10
when t1.business_flag = '22113' then 9
when t1.business_flag = '22112' then 8
when t1.business_flag = '22109' then 7
when t1.business_flag = '22108' then 6
when t1.business_flag = '22110' then 5
when t1.business_flag = '22106' then 4
when t1.business_flag = '22107' then 3
when t1.business_flag = '22111' then 2
when t1.business_flag = '12100' then 1 end as cust_curr_step_num
,case when t2.active_datetime is not null then t2.active_datetime
  when t5.business_flag_audit is not null then t5.curr_datetime
  when t5.business_flag_video is not null then 
t5.create_datetime
  else t1.curr_datetime end as cust_curr_step_occu_tm
,t1.user_id
from
(
select
curr_datetime
,user_id
,business_flag
from
(
select
replace(curr_datetime,'-','') as curr_datetime
,user_id
,business_flag
,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER 
BY curr_datetime DESC) AS rn
from dm_crh_cust_oact_rec_ri
where business_flag in 
('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500')
)t
where rn = 1
) t1
left join
(
select
user_id
,client_name
,mobile_tel
,replace(substr(active_datetime,1,19),'-','') as active_datetime
,client_id
from dm_crh_cust_info_ri
) t2
on t1.user_id = t2.user_id
left join
(
select
t1.user_id
,t1.join_position_str
,replace(t1.create_datetime,'-','') as create_datetime
,t1.business_flag AS business_flag_video
,t2.business_flag AS business_flag_audit
,replace(t2.curr_datetime,'-','') as curr_datetime
from dm_crh_user_vidro_ri t1
left join dm_crh_audit_rec_ri t2
on t1.join_position_str = t2.request_no
where t1.business_flag in ('1200','1202','1203')
or t2.business_flag in ('1003','1011')
) t5
on t1.user_id = t5.user_id
) t
where cust_curr_step is not null
) t
where rn = 1
;
insert into dm_cust_oact_prog_ri_print
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ri
;
insert into dm_cust_oact_prog_ri
select
cust_id
,cust_nme
,cust_mob_tel
,cust_curr_step
,cust_curr_step_num
,cust_curr_step_occu_tm
,user_id
,tech_sys_time
from v_dm_cust_oact_prog_ri】
| |
小昌同学
|
|
ccc0606fight

回复: flink写入mysql数据异常

2023-03-23 Thread
好滴呀,谢谢您的建议;
https://www.yuque.com/g/echochangtongxue/yxxdbg/iyfqa9fh34i5lssu/collaborator/join?token=KZCQVX5pqH3rmPNP#
 邀请你共同编辑文档《Flink SQL写入到mysql的问题》
我创建了一个语雀,我将代码以及问题都写在文档里了,麻烦大佬们帮忙看一下问题呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Shammon FY |
| 发送日期 | 2023年3月24日 13:08 |
| 收件人 |  |
| 主题 | Re: flink写入mysql数据异常 |
Hi

你可以将问题描述和sql放在一个外部文档,例如google文档,然后将文档连接发在邮件里

Best,
Shammon FY

On Fri, Mar 24, 2023 at 10:58 AM 孙冬燕 
wrote:

退订
--
发件人:小昌同学 
发送时间:2023年3月24日(星期五) 10:57
收件人:user-zh 
抄 送:user-zh 
主 题:回复: flink写入mysql数据异常
您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确;
您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下;
谢谢大佬的指导
| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年3月23日 20:40 |
| 收件人 |  |
| 主题 | Re: flink写入mysql数据异常 |
附件还是没有收到哦.
Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]
[1]

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries
<
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries

On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:
您好,我刚刚重新上传了附件;是的,Flink
SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
小昌同学
ccc0606fight...@163.com
<
https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
<
https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>

 回复的原邮件 
发件人 Jane Chan 
发送日期 2023年3月23日 15:42
收件人  
主题 Re: flink写入mysql数据异常
Hi,
没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
并且与数据库中物理表主键保持一致. 可以参考 [1].
[1]

https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86
<
https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86

On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:
大佬,你好,代码上传在附件中了;
就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?
小昌同学
ccc0606fight...@163.com
<

https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D
<
https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D

 回复的原邮件 
发件人 Jane Chan 
发送日期 2023年3月23日 14:23
收件人  
主题 Re: flink写入mysql数据异常
可以把完整 SQL 发出来看看
祝好!
Jane
On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:
使用flink
sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊
| |
小昌同学
|
|
ccc0606fight...@163.com
|



回复: flink写入mysql数据异常

2023-03-23 Thread
您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确;
您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下;
谢谢大佬的指导


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年3月23日 20:40 |
| 收件人 |  |
| 主题 | Re: flink写入mysql数据异常 |
附件还是没有收到哦.

Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [,
column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries

On Thu, Mar 23, 2023 at 5:35 PM 小昌同学  wrote:

您好,我刚刚重新上传了附件;是的,Flink
SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
小昌同学
ccc0606fight...@163.com

<https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
 回复的原邮件 
发件人 Jane Chan 
发送日期 2023年3月23日 15:42
收件人  
主题 Re: flink写入mysql数据异常
Hi,

没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
并且与数据库中物理表主键保持一致. 可以参考 [1].

[1]

https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86

On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:

大佬,你好,代码上传在附件中了;
就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?

小昌同学
ccc0606fight...@163.com

<
https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D

 回复的原邮件 
发件人 Jane Chan 
发送日期 2023年3月23日 14:23
收件人  
主题 Re: flink写入mysql数据异常
可以把完整 SQL 发出来看看

祝好!
Jane

On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:

使用flink
sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|





回复: flink写入mysql数据异常

2023-03-23 Thread
您好,我刚刚重新上传了附件;是的,Flink 
SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导
| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年3月23日 15:42 |
| 收件人 |  |
| 主题 | Re: flink写入mysql数据异常 |
Hi,

没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键,
并且与数据库中物理表主键保持一致. 可以参考 [1].

[1]
https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86

On Thu, Mar 23, 2023 at 2:54 PM 小昌同学  wrote:

大佬,你好,代码上传在附件中了;
就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?

小昌同学
ccc0606fight...@163.com

<https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
 回复的原邮件 
发件人 Jane Chan 
发送日期 2023年3月23日 14:23
收件人  
主题 Re: flink写入mysql数据异常
可以把完整 SQL 发出来看看

祝好!
Jane

On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:

使用flink
sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|




回复: flink写入mysql数据异常

2023-03-23 Thread
大佬,你好,代码上传在附件中了;
就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法?


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Jane Chan |
| 发送日期 | 2023年3月23日 14:23 |
| 收件人 |  |
| 主题 | Re: flink写入mysql数据异常 |
可以把完整 SQL 发出来看看

祝好!
Jane

On Thu, Mar 23, 2023 at 1:39 PM 小昌同学  wrote:

使用flink
sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|


flink写入mysql数据异常

2023-03-22 Thread
使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert 
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re: flink sql

2023-03-03 Thread
好滴  谢谢大佬呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 Replied Message 
| From | 17610775726<17610775...@163.com> |
| Date | 3/3/2023 15:55 |
| To | user-zh@flink.apache.org |
| Cc | user-zh |
| Subject | Re:flink sql |
Hi



可以通过设置 pipeline.operator-chaining = false 来实现。


Best
JasonLee


 Replied Message 
| From | 小昌同学 |
| Date | 03/3/2023 15:50 |
| To | user-zh |
| Subject | flink sql |
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink sql

2023-03-02 Thread
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink状态恢复

2023-01-11 Thread
我有一个flink任务运行很久了,统计的指标是一些聚合值,但是现在业务想要增加字段,请问一下这个场景大家是怎么处理的啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|

回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务

2022-09-16 Thread
截图一下日志报错的exception看看


| |
小昌
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | yidan zhao |
| 发送日期 | 2022年9月16日 14:20 |
| 收件人 | user-zh |
| 主题 | Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务 |
什么部署模式。

Summer  于2022年9月16日周五 13:57写道:


Flink版本:1.13.3
我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
请问有什么办法取消这个任务。
























回复:flink table API使用

2022-09-05 Thread
感谢感谢大佬指点


| |
应聘者昌呈呈
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Xuyang |
| 发送日期 | 2022年9月6日 00:03 |
| 收件人 |  |
| 主题 | Re:flink table API使用 |
Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1]




[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java




--

Best!
Xuyang





在 2022-09-05 20:53:03,"小昌同学"  写道:


Table result = kafka_item.groupBy($("trans_number"))   
.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
.filter($("sum_amount "));
各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
我这个后续怎么使用API啊  这个filter算子咋用呀
| |
小昌
|
|
ccc0606fight...@163.com
|


flink table API使用

2022-09-05 Thread


Table result = kafka_item.groupBy($("trans_number"))   
.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
   .filter($("sum_amount "));
各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
我这个后续怎么使用API啊  这个filter算子咋用呀
| |
小昌
|
|
ccc0606fight...@163.com
|

flink sql解析kafka数据

2022-06-30 Thread
各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
   trans_number   STRING,
   end_timestamp  STRING,
   return_flagSTRING,
   commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '6',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '165373920',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fight...@163.com
|

Unaligned Checkpoint

2022-06-11 Thread
大佬们可以说说Unaligned Checkpoint的实现吗  看了不少文档 没有太看懂  我如果想在sql里面实现  这个该怎么设置啊  请大佬们指教


| |
小昌同学
|
|
ccc0606fight...@163.com
|