回复: Flink SQL的状态清理
你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年9月25日 11:24 | | 收件人 | | | 主题 | Re: Flink SQL的状态清理 | Hi, 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1] [1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86 Best, Jane On Thu, Sep 21, 2023 at 5:17 PM faronzz wrote: 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s") | | faronzz | | faro...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年09月21日 17:06 | | 收件人 | user-zh | | 主题 | Flink SQL的状态清理 | 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛 | | 小昌同学 | | ccc0606fight...@163.com |
Flink SQL的状态清理
各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛 | | 小昌同学 | | ccc0606fight...@163.com |
回复: Flink 窗口触发条件
好滴呀 谢谢各位老师指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Yanfei Lei | | 发送日期 | 2023年8月10日 11:50 | | 收件人 | | | 主题 | Re: Flink 窗口触发条件 | hi, 感觉和[1]的问题比较像,事件时间的window在onElement和onEventTime时会触发,这两个方法又会根据watermark判断,可以看看o.a.f.table.runtime.operators.window.triggers包和o.a.f.table.runtime.operators.wmassigners包。 [1] https://juejin.cn/post/6850418110010179597 小昌同学 于2023年8月10日周四 10:52写道: 各位老师好,我这边在使用Flink的事件时间窗口的时候,关于窗口触发的条件我有一点疑问想咨询一下各位老师 我是开了一个2分钟的事件时间的窗口,但是等到两分钟后窗口并没有主动触发,等我后面再发一条数据的时候,窗口再进行了触发 所以我想请问一下窗口的触发机制不是时间点嘛,而是非要等到下一条数据发送,依赖于下一条数据携带的时间戳大于窗口的结束时间,上一个窗口才会真正的触发嘛 请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com | -- Best, Yanfei
Flink 窗口触发条件
各位老师好,我这边在使用Flink的事件时间窗口的时候,关于窗口触发的条件我有一点疑问想咨询一下各位老师 我是开了一个2分钟的事件时间的窗口,但是等到两分钟后窗口并没有主动触发,等我后面再发一条数据的时候,窗口再进行了触发 所以我想请问一下窗口的触发机制不是时间点嘛,而是非要等到下一条数据发送,依赖于下一条数据携带的时间戳大于窗口的结束时间,上一个窗口才会真正的触发嘛 请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
回复: 回复: Flink消费MySQL
好的 谢谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | ron | | 发送日期 | 2023年8月10日 00:51 | | 收件人 | | | 主题 | Re: 回复: Flink消费MySQL | Hi, 建议通过CDC实时读,然后用Flink的双流Join进行关联。 -原始邮件- 发件人: "小昌同学" 发送时间: 2023-08-08 11:10:19 (星期二) 收件人: user-zh 抄送: user-zh 主题: 回复: Flink消费MySQL 谢谢老师指导呀; 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 老师这一块有更好的建议嘛 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年8月8日 10:37 | | 收件人 | | | 主题 | Re: Flink消费MySQL | Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: 各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com | -- Best, Ron
回复: Flink消费MySQL
谢谢老师指导呀; 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 老师这一块有更好的建议嘛 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年8月8日 10:37 | | 收件人 | | | 主题 | Re: Flink消费MySQL | Hi, 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 Best, Shammon FY On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 wrote: 各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com |
Flink消费MySQL
各位老师好 ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; 以下是我的代码: | public class MysqlSource2 extends RichSourceFunction { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql="select * from actiontype;"; ps = connection.prepareStatement(sql); } private static Connection getConnection(){ Connection con=null; String driverClass= FlinkConfig.config.getProperty("driverClass"); String url=FlinkConfig.config.getProperty("jdbcUrl"); String user=FlinkConfig.config.getProperty("jdbcUser"); String passWord=FlinkConfig.config.getProperty("passWord"); try { Class.forName(driverClass); con= DriverManager.getConnection(url,user,passWord); } catch (Exception e) { throw new RuntimeException(e); } return con; } @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()){ ActionType actionType = new ActionType( resultSet.getString("action"), resultSet.getString("action_name") ); ctx.collect(actionType); } } @Override public void close() throws Exception { super.close(); if (null!=connection){ connection.close(); } if (null!=ps){ ps.close(); } } @Override public void cancel() { } }; | | | 小昌同学 | | ccc0606fight...@163.com |
回复: JdbcSink引发的IO过高
你好,感谢老师回复 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,我理解这两种情况的话,对于数据库来说,就是一次插入与两次插入的问题了吧,要是数据量大的话,感觉对性能还是有影响的 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年7月26日 10:52 | | 收件人 | | | 主题 | Re: JdbcSink引发的IO过高 | Hi, 目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc server,这样会节省网络IO。 具体到数据库侧,我理解执行 `insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下 Best, Shammon FY On Tue, Jul 25, 2023 at 4:02 PM 小昌同学 wrote: 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10); 或者是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO | errorStream.addSink(JdbcSink.sink( "insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", (statement, result) -> { statement.setString(1,result.getAction()); statement.setString(2,result.getServerIp()); statement.setString(3,result.getHandleSerialno()); statement.setString(4,result.getMd5Num()); statement.setString(5,result.getInsertTime()); statement.setString(6, result.getDateTime()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("222") .build() )).name("sink-error-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
JdbcSink引发的IO过高
各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师: 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式 是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6),(1,2,3,4,9,10); 或者是 insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6) insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10) 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO | errorStream.addSink(JdbcSink.sink( "insert into error_md5_info (action, serverIp, handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)", (statement, result) -> { statement.setString(1,result.getAction()); statement.setString(2,result.getServerIp()); statement.setString(3,result.getHandleSerialno()); statement.setString(4,result.getMd5Num()); statement.setString(5,result.getInsertTime()); statement.setString(6, result.getDateTime()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("222") .build() )).name("sink-error-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink jdbcsink 连接数的问题
你好,老师,感谢你的回复; 我在MySQL中设置的主键不是自增主键,是一个业务主键的,所以我理解是不是这一块只能是设置并发度为1进行数据的插入 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | yidan zhao | | 发送日期 | 2023年5月31日 12:30 | | 收件人 | | | 主题 | Re: flink jdbcsink 连接数的问题 | 你好,这个问题和flink无关,看你主键实现机制吧,如果是自增,那就是mysql级别自动实现的自增,跟flink搭不上关系的。 小昌同学 于2023年5月31日周三 09:41写道: 老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突; 感谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | lxk | | 发送日期 | 2023年5月30日 14:30 | | 收件人 | | | 主题 | Re:flink jdbcsink 连接数的问题 | hi, jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。 关于连接数,则是跟你的并行度有关。 在 2023-05-30 13:55:57,"小昌同学" 写道: 各位老师,请教一下关于flink jdbcsink 连接数的问题; 我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; 谢谢各位老师的指导 | outPutInfoStream.addSink(JdbcSink.sink( "REPLACE into InputInfo (breakCode, breakName, breakDuration, breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs) values (?,?,?,?,?,?,?,?,?,?)", (statement, InPutInfo) -> { statement.setString(1,InPutInfo.getBreakCode()); statement.setString(2,InPutInfo.getBreakName()); statement.setLong(3,InPutInfo.getBreakDuration()); statement.setString(4,InPutInfo.getBreakRule()); statement.setString(5,InPutInfo.getBreakPrimaryKey()); statement.setString(6, InPutInfo.getBreakStep()); statement.setString(7, InPutInfo.getBreakStepType()); statement.setString(8,InPutInfo.getBreakTime()); statement.setString(9, DateUtil.format(new Date())); statement.setString(10, String.valueOf(InPutInfo.getBreakArgs())); }, JdbcExecutionOptions.builder() .withBatchSize(10) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("111") .build() )).name("sink-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink 输出异常数据
你好,老师,感谢你的回复; 您说的打印到日志文件,是需要配置flink 的logback.xml 嘛,这一块的配置有一个小demo可以参考嘛 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年6月1日 10:33 | | 收件人 | | | 主题 | Re: flink 输出异常数据 | Hi 可以看一下报空指针的具体异常栈,如果是你的业务代码,可以在你的处理逻辑里加上一些判断信息并打印到日志文件;如果不是你的业务代码,可以贴一下具体的异常栈信息。 On Wed, May 31, 2023 at 12:31 PM yidan zhao wrote: 这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。 小昌同学 于2023年5月29日周一 18:30写道: 你好,数据源是kafka,使用的是stream api | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年5月29日 15:29 | | 收件人 | | | 主题 | Re: flink 输出异常数据 | Hi, 你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢? 方便把异常栈贴一下吗 Best, Weihua On Mon, May 29, 2023 at 1:36 PM 小昌同学 wrote: 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink web ui显示问题
你好,老师,感谢回复,我这边将截图放在了腾讯文档中,请查收; 感谢各位老师的指导 【腾讯文档】flink web ui https://docs.qq.com/sheet/DYkZ0Q0prRWJxcER4?tab=BB08J2 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年5月31日 09:59 | | 收件人 | | | 主题 | Re: flink web ui显示问题 | Hi, 好像没有收到附件或者文档,你可以检查确认一下 Best, Shammon FY On Wed, May 31, 2023 at 9:52 AM 小昌同学 wrote: 各位老师好,请教一个关于flink web ui的显示问题; 具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web ui上面已经显示watermark,但是看detail的时候显示不是watermark; 感谢各位老师指导 小昌同学 ccc0606fight...@163.com <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
flink web ui显示问题
各位老师好,请教一个关于flink web ui的显示问题; 具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web ui上面已经显示watermark,但是看detail的时候显示不是watermark; 感谢各位老师指导 | | 小昌同学 | | ccc0606fight...@163.com |
回复:flink jdbcsink 连接数的问题
老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突; 感谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | lxk | | 发送日期 | 2023年5月30日 14:30 | | 收件人 | | | 主题 | Re:flink jdbcsink 连接数的问题 | hi, jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。 关于连接数,则是跟你的并行度有关。 在 2023-05-30 13:55:57,"小昌同学" 写道: 各位老师,请教一下关于flink jdbcsink 连接数的问题; 我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; 谢谢各位老师的指导 | outPutInfoStream.addSink(JdbcSink.sink( "REPLACE into InputInfo (breakCode, breakName, breakDuration, breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs) values (?,?,?,?,?,?,?,?,?,?)", (statement, InPutInfo) -> { statement.setString(1,InPutInfo.getBreakCode()); statement.setString(2,InPutInfo.getBreakName()); statement.setLong(3,InPutInfo.getBreakDuration()); statement.setString(4,InPutInfo.getBreakRule()); statement.setString(5,InPutInfo.getBreakPrimaryKey()); statement.setString(6, InPutInfo.getBreakStep()); statement.setString(7, InPutInfo.getBreakStepType()); statement.setString(8,InPutInfo.getBreakTime()); statement.setString(9, DateUtil.format(new Date())); statement.setString(10, String.valueOf(InPutInfo.getBreakArgs())); }, JdbcExecutionOptions.builder() .withBatchSize(10) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("111") .build() )).name("sink-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
回复:flink jdbcsink 连接数的问题
好滴呀 谢谢老师指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | lxk | | 发送日期 | 2023年5月30日 14:30 | | 收件人 | | | 主题 | Re:flink jdbcsink 连接数的问题 | hi, jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。 关于连接数,则是跟你的并行度有关。 在 2023-05-30 13:55:57,"小昌同学" 写道: 各位老师,请教一下关于flink jdbcsink 连接数的问题; 我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; 谢谢各位老师的指导 | outPutInfoStream.addSink(JdbcSink.sink( "REPLACE into InputInfo (breakCode, breakName, breakDuration, breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs) values (?,?,?,?,?,?,?,?,?,?)", (statement, InPutInfo) -> { statement.setString(1,InPutInfo.getBreakCode()); statement.setString(2,InPutInfo.getBreakName()); statement.setLong(3,InPutInfo.getBreakDuration()); statement.setString(4,InPutInfo.getBreakRule()); statement.setString(5,InPutInfo.getBreakPrimaryKey()); statement.setString(6, InPutInfo.getBreakStep()); statement.setString(7, InPutInfo.getBreakStepType()); statement.setString(8,InPutInfo.getBreakTime()); statement.setString(9, DateUtil.format(new Date())); statement.setString(10, String.valueOf(InPutInfo.getBreakArgs())); }, JdbcExecutionOptions.builder() .withBatchSize(10) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("111") .build() )).name("sink-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
flink jdbcsink 连接数的问题
各位老师,请教一下关于flink jdbcsink 连接数的问题; 我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; 谢谢各位老师的指导 | outPutInfoStream.addSink(JdbcSink.sink( "REPLACE into InputInfo (breakCode, breakName, breakDuration, breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs) values (?,?,?,?,?,?,?,?,?,?)", (statement, InPutInfo) -> { statement.setString(1,InPutInfo.getBreakCode()); statement.setString(2,InPutInfo.getBreakName()); statement.setLong(3,InPutInfo.getBreakDuration()); statement.setString(4,InPutInfo.getBreakRule()); statement.setString(5,InPutInfo.getBreakPrimaryKey()); statement.setString(6, InPutInfo.getBreakStep()); statement.setString(7, InPutInfo.getBreakStepType()); statement.setString(8,InPutInfo.getBreakTime()); statement.setString(9, DateUtil.format(new Date())); statement.setString(10, String.valueOf(InPutInfo.getBreakArgs())); }, JdbcExecutionOptions.builder() .withBatchSize(10) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") .withDriverName("com.mysql.jdbc.Driver") .withUsername("111") .withPassword("111") .build() )).name("sink-mysql"); | | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink 输出异常数据
你好,数据源是kafka,使用的是stream api | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年5月29日 15:29 | | 收件人 | | | 主题 | Re: flink 输出异常数据 | Hi, 你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢? 方便把异常栈贴一下吗 Best, Weihua On Mon, May 29, 2023 at 1:36 PM 小昌同学 wrote: 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com |
flink 输出异常数据
各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink 窗口触发计算的条件
请教一下老师,您说的【同样数据的话,水印没有推进,窗口就不会触发】是不是意思是发送相同的数据,数据本身携带的时间戳是一样的,达不到水位线触发窗口的标准呀? 还有两个问题想请教一下各位老师: 1、事件时间窗口的闭合是取决于下一条数据所携带的时间戳嘛,只有当下一条数据携带的时间戳大于上一个窗口的endTime,窗口才会触发,如果是这个样子的话,那如果一个最后一个窗口怎么触发啊 2、我想使用stream api去打印出来窗口的起始时间以及结束时间,这个是哪一个api呀 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | lxk | | 发送日期 | 2023年5月25日 10:14 | | 收件人 | | | 主题 | Re:回复: flink 窗口触发计算的条件 | 你好,可以先看看官方文档中关于事件时间和水印的介绍 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/ 如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发 在 2023-05-25 10:00:36,"小昌同学" 写道: 是的 我发送了很多数据,发现窗口还是没有触发 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | yidan zhao | | 发送日期 | 2023年5月25日 09:59 | | 收件人 | | | 主题 | Re: flink 窗口触发计算的条件 | 如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。 小昌同学 于2023年5月25日周四 09:32写道: 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题; 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒, 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在: 相关代码以及样例数据如下: | package job; import bean.MidInfo3; import bean.Result; import bean2.BaseInfo2; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import utils.DateUtil; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.Duration; import java.util.HashMap; import java.util.Properties; public class RytLogAnly9 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); //1、消费Kafka中的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); sourceStream.print("最源端的数据sourceStream"); //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo2 map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间 String time = splits[0].substring(7, 19).replace("-", "").trim(); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的
回复: flink 窗口触发计算的条件
是的 我发送了很多数据,发现窗口还是没有触发 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | yidan zhao | | 发送日期 | 2023年5月25日 09:59 | | 收件人 | | | 主题 | Re: flink 窗口触发计算的条件 | 如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。 小昌同学 于2023年5月25日周四 09:32写道: 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题; 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒, 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在: 相关代码以及样例数据如下: | package job; import bean.MidInfo3; import bean.Result; import bean2.BaseInfo2; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import utils.DateUtil; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.Duration; import java.util.HashMap; import java.util.Properties; public class RytLogAnly9 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); //1、消费Kafka中的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); sourceStream.print("最源端的数据sourceStream"); //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo2 map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间 String time = splits[0].substring(7, 19).replace("-", "").trim(); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis()); } }).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime())); baseInfoStream.print("不加功能描述的 baseInfoStream"); //3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述 SingleOutputStreamOperator completeInfoStream = baseInfoStream.map(new MapFunction() { @Override public BaseInfo2 map(BaseInfo2 value) throws Exception { //拿
flink 窗口触发计算的条件
各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题; 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒, 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在: 相关代码以及样例数据如下: | package job; import bean.MidInfo3; import bean.Result; import bean2.BaseInfo2; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import utils.DateUtil; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.Duration; import java.util.HashMap; import java.util.Properties; public class RytLogAnly9 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); //1、消费Kafka中的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); sourceStream.print("最源端的数据sourceStream"); //2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo2 map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及应答时间 String time = splits[0].substring(7, 19).replace("-", "").trim(); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, System.currentTimeMillis()); } }).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime())); baseInfoStream.print("不加功能描述的 baseInfoStream"); //3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述 SingleOutputStreamOperator completeInfoStream = baseInfoStream.map(new MapFunction() { @Override public BaseInfo2 map(BaseInfo2 value) throws Exception { //拿到数据中携带的数字的action String actionId = value.getFuncId(); System.out.println("数据中的action编码是: " + actionId); String actionName = null;
table api定义rowtime未生效
各位老师好,以下是我的代码: | Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime()); tableEnv.createTemporaryView("midTable1",midTable); Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +"FROM TABLE(CUMULATE(\n" +" TABLE midTable1"+//" TABLE "+ midTable +" , DESCRIPTOR(eventTime)\n" +" , INTERVAL '60' SECOND\n" + " , INTERVAL '1' DAY))\n" +" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); | 我在流转换为表的时候,定义了流中的字段eventTime为rowtime,但是在执行下面的sqlQuery语句的时候,还是报错:Rowtime timestamp is not defined. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic 想请教一下各位老师解决之法 | | 小昌同学 | | ccc0606fight...@163.com |
回复: 回复:报错显示为bug
好滴呀 谢谢各位老师 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年5月16日 08:46 | | 收件人 | , | | 主题 | Re: 回复:报错显示为bug | Hi, 从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段 Best, Shammon FY On Mon, May 15, 2023 at 7:29 PM lxk wrote: 你好,从报错来看是类型不兼容导致的。 Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 At 2023-05-15 18:29:15, "小昌同学" wrote: | package job; import bean.BaseInfo; import bean.MidInfo; import bean.OutInfo; import bean.ResultInfo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import function.MyProcessFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.util.OutputTag; import sink.Sink2Mysql; import utils.DateUtil; import utils.DateUtils; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.*; import java.util.Date; import java.util.HashMap; import java.util.Properties; public class RytLogAnly4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用侧输出流 OutputTag requestStream = new OutputTag("requestStream") { }; OutputTag answerStream = new OutputTag("answerStream") { }; //1、连接测试环境kafka的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); //{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} //2、对源数据进行处理,生成baseInfo基类的数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 String time = splits[0].substring(7, 19); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); } }); //3、使用process方法进行baseInfoStream流切割 SingleOutputStreamOperator tagStream = baseInfoStream.process(new MyProcessFunction(reques
回复:报错显示为bug
er.java:4396) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86) ... 22 more Process finished with exit code 1 | | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | lxk | | 发送日期 | 2023年5月15日 18:21 | | 收件人 | | | 主题 | Re:报错显示为bug | 你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 在 2023-05-15 17:11:42,"小昌同学" 写道: 各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. “ flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 | | 小昌同学 | | ccc0606fight...@163.com |
报错显示为bug
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. “ flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 | | 小昌同学 | | ccc0606fight...@163.com |
flink 状态设置
各位老师好,我这边使用的flink sql是" select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as varchar(200)) as minTime,pk from ( select a.funcId as funcId , a.funcIdDesc as funcIdDesc, a.serverIp as serverIp, b.outTime as maxTime, a.outTime as minTime, concat(a.funcId,a.serverIp) as pk from tableRequest a inner join tableAnswer b on a.handleSerialNo=b.handleSerialNo ) group by funcId,funcIdDesc,serverIp,pk" 考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。 | | 小昌同学 | | ccc0606fight...@163.com |
回复: 不同的流程使用不同的并行度
好滴呀 谢谢各位老师指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | yidan zhao | | 发送日期 | 2023年4月21日 10:50 | | 收件人 | | | 主题 | Re: 不同的流程使用不同的并行度 | 从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。 Shammon FY 于2023年4月21日周五 09:04写道: Hi DataStream作业设置并发度有两种方式 1. 在ExecutionEnvironment通过setParallelism设置全局并发 2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度 Best, Shammon FY On Fri, Apr 21, 2023 at 8:58 AM 小昌同学 wrote: 各位老师好,请教一下关于flink的并行度的问题; 我现在数据上游是kafka(四个分区),经过Flink ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API 还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下 | | 小昌同学 | | ccc0606fight...@163.com |
不同的流程使用不同的并行度
各位老师好,请教一下关于flink的并行度的问题; 我现在数据上游是kafka(四个分区),经过Flink ETL处理后,实时落地到Kafka以及MYSQL,那我想在不同的阶段设置不同的并行度,这一块可以怎么使用,我使用的是DataStream API 还想请教一下就是关于并行度的这个设置,应该从哪些方面进行考虑啊,麻烦各位老师指教一下 | | 小昌同学 | | ccc0606fight...@163.com |
回复:控制台打印出流式数据
好滴呀 谢谢老师 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 发送日期 | 2023年4月19日 16:06 | | 收件人 | flink中文邮件组 | | 主题 | 回复:控制台打印出流式数据 | 这个方法就可以打印在你本地的idea控制台里面,你试一下 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 16:01 | | 收件人 | user-zh | | 抄送人 | user-zh | | 主题 | 回复:控制台打印出流式数据 | 这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 发送日期 | 2023年4月19日 15:58 | | 收件人 | flink中文邮件组 | | 主题 | 回复:控制台打印出流式数据 | hi,你好 你应该使用 stream.print() 来打印流中的数据 不要system out 输出 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 15:51 | | 收件人 | user-zh | | 主题 | 控制台打印出流式数据 | 各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接 System.out.println(stream.toString); 但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
回复:flink命令行提交作业读取不到properties配置文件
我这边的做法是将配置文件也当作一条流进行读取,程序会自动读取,不需要再任务启动的时候指定;希望对你有帮助呀 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 发送日期 | 2023年4月19日 15:57 | | 收件人 | flink中文邮件组 , user-zh-subscribe | | 主题 | flink命令行提交作业读取不到properties配置文件 | hi,大家好 我在使用命令行提交任务时,发现任务刚起来就会报错,根据错误发现没有读去到jar包中resource目录下的properties配置文件,导致在使用redis时,初始化报错 提交命令如下: flink run -c com.test..etl.OdsChangeApplication /opt/dobrain/app/etl/test-etl-0.0.2-SNAPSHOT.jar \ -p 4 \ -job-name test-etl \ 此处没有添加redis配置参数,但是配置文件中已经有默认的,提交运行后报错: java.lang.IllegalArgumentException: template not initialized; call afterPropertiesSet() before using it at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.2.14.RELEASE.jar:5.2.14.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:204) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53) ~[spring-data-redis-2.3.9.RELEASE.jar:2.3.9.RELEASE] at com.test.etl.client.RedisService.getStringValue(RedisService.java:30) ~[classes/:?] at com.test.etl.manager.impl.RedisChangeManager.getCustId(RedisChangeManager.java:53) ~[classes/:?] at com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:46) ~[classes/:?] at com.test.etl.transformation.process.MsgHandleProcess.processElement(MsgHandleProcess.java:21) ~[classes/:?] at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.2.jar:1.15.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.2.jar:1.15.2] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] 我尝试在命令行添加了redis的参数,启动任务测试发现也会报如下的错误 请问大佬们,这个怎么解决,就是在命令行提交任务,怎么可以读取到jar包中定义的properties配置文件呢 | | Jason_H | | hyb_he...@163.com |
回复:控制台打印出流式数据
这个print是将数据打印再flink的stud out吧,我现在是再本地进行调试,想在本地上打印出来结果 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jason_H | | 发送日期 | 2023年4月19日 15:58 | | 收件人 | flink中文邮件组 | | 主题 | 回复:控制台打印出流式数据 | hi,你好 你应该使用 stream.print() 来打印流中的数据 不要system out 输出 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年4月19日 15:51 | | 收件人 | user-zh | | 主题 | 控制台打印出流式数据 | 各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接 System.out.println(stream.toString); 但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
控制台打印出流式数据
各位老师好,请教一个问题,就是上游的数据源是Kafka,编辑工具是idea,再new FlinkKafkaConsumer后,得到一条流stream,我想看一下流中的数据,直接 System.out.println(stream.toString); 但是从控制台打印结果来看,打印出来的还是地址值,请各位老师指导一下 | | 小昌同学 | | ccc0606fight...@163.com |
流数据转化为json
你好,请问一下上游的数据是 SingleOutputStreamOperator outPutInfoStream = keyedStream.process(new KeyStreamFunc()); 数据样式为:InPutInfo[phone='123456',workId='001'] 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案; 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点 | | 小昌同学 | | ccc0606fight...@163.com |
回复:flink sink web ui显示为Sink: Unnamed
好滴,谢谢各位老师 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | hjw | | 发送日期 | 2023年4月14日 16:38 | | 收件人 | | | 主题 | Re:flink sink web ui显示为Sink: Unnamed | 可以在算子后面调用.name()方法指定名称,方法参数就是算子名称。 比如需sink的流为stream stream.sinkTo(Sink算子).name("sink-name") -- Best, Hjw 在 2023-04-14 16:26:35,"小昌同学" 写道: 我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed ,这个针对sink节点可以命名嘛 | | 小昌同学 | | ccc0606fight...@163.com |
flink sink web ui显示为Sink: Unnamed
我将流式数据输出到mysql,查看flink 自带的web ui界面,有一个sink节点显示为Sink: Unnamed ,这个针对sink节点可以命名嘛 | | 小昌同学 | | ccc0606fight...@163.com |
回复:打印不同流怎么进行区分
好滴,谢谢您 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | 17610775726<17610775...@163.com> | | 发送日期 | 2023年4月14日 10:27 | | 收件人 | user-zh@flink.apache.org | | 抄送人 | user-zh | | 主题 | 回复:打印不同流怎么进行区分 | Hi Print 方法是可以传入一个参数的,用来标识某个流,比如 print(“a”); print(“b"); Best JasonLee 回复的原邮件 ---- | 发件人 | 小昌同学 | | 发送日期 | 2023年04月14日 09:46 | | 收件人 | user-zh | | 主题 | 打印不同流怎么进行区分 | 你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉 | | 小昌同学 | | ccc0606fight...@163.com |
打印不同流怎么进行区分
你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉 | | 小昌同学 | | ccc0606fight...@163.com |
flink sql upsert mysql问题
'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败 'json.ignore-parse-errors' = 'true' -- 解析失败跳过 ); create view v_dm_cust_oact_prog_ri as select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,FROM_UNIXTIME(UNIX_TIMESTAMP()) AS tech_sys_time from ( select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,ROW_NUMBER() OVER(PARTITION BY user_id,cust_curr_step ORDER BY cust_curr_step_occu_tm DESC) AS rn from ( select t2.client_id as cust_id ,t2.client_name as cust_nme ,t2.mobile_tel as cust_mob_tel ,case when t2.active_datetime is not null then '开户成功' when t5.business_flag_audit in ('1003','1011') then '人工审核' when t1.business_flag = '22114' then '提交申请' when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then '视屏验证' when t1.business_flag = '22113' then '绑定三方存管' when t1.business_flag = '22112' then '设置密码' when t1.business_flag = '22109' then '协议签署' when t1.business_flag = '22108' then '开通账户选择' when t1.business_flag = '22110' then '风险评测' when t1.business_flag = '22106' then '填写基本资料' when t1.business_flag = '22107' then '上传身份证' when t1.business_flag = '22111' then '选择营业部' when t1.business_flag = '12100' then '新开户:注册申请开户' end as cust_curr_step ,case when t2.active_datetime is not null then 13 when t5.business_flag_audit in ('1003','1011') then 12 when t1.business_flag = '22114' then 11 when t1.business_flag in ('22115','33500') or t5.business_flag_video in ('1200','1202','1203') then 10 when t1.business_flag = '22113' then 9 when t1.business_flag = '22112' then 8 when t1.business_flag = '22109' then 7 when t1.business_flag = '22108' then 6 when t1.business_flag = '22110' then 5 when t1.business_flag = '22106' then 4 when t1.business_flag = '22107' then 3 when t1.business_flag = '22111' then 2 when t1.business_flag = '12100' then 1 end as cust_curr_step_num ,case when t2.active_datetime is not null then t2.active_datetime when t5.business_flag_audit is not null then t5.curr_datetime when t5.business_flag_video is not null then t5.create_datetime else t1.curr_datetime end as cust_curr_step_occu_tm ,t1.user_id from ( select curr_datetime ,user_id ,business_flag from ( select replace(curr_datetime,'-','') as curr_datetime ,user_id ,business_flag ,ROW_NUMBER() OVER(PARTITION BY user_id,business_flag ORDER BY curr_datetime DESC) AS rn from dm_crh_cust_oact_rec_ri where business_flag in ('22114','22113','22112','22109','22108','22110','22106','22107','22111','12100','22115','33500') )t where rn = 1 ) t1 left join ( select user_id ,client_name ,mobile_tel ,replace(substr(active_datetime,1,19),'-','') as active_datetime ,client_id from dm_crh_cust_info_ri ) t2 on t1.user_id = t2.user_id left join ( select t1.user_id ,t1.join_position_str ,replace(t1.create_datetime,'-','') as create_datetime ,t1.business_flag AS business_flag_video ,t2.business_flag AS business_flag_audit ,replace(t2.curr_datetime,'-','') as curr_datetime from dm_crh_user_vidro_ri t1 left join dm_crh_audit_rec_ri t2 on t1.join_position_str = t2.request_no where t1.business_flag in ('1200','1202','1203') or t2.business_flag in ('1003','1011') ) t5 on t1.user_id = t5.user_id ) t where cust_curr_step is not null ) t where rn = 1 ; insert into dm_cust_oact_prog_ri_print select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ri ; insert into dm_cust_oact_prog_ri select cust_id ,cust_nme ,cust_mob_tel ,cust_curr_step ,cust_curr_step_num ,cust_curr_step_occu_tm ,user_id ,tech_sys_time from v_dm_cust_oact_prog_ri】 | | 小昌同学 | | ccc0606fight
回复: flink写入mysql数据异常
好滴呀,谢谢您的建议; https://www.yuque.com/g/echochangtongxue/yxxdbg/iyfqa9fh34i5lssu/collaborator/join?token=KZCQVX5pqH3rmPNP# 邀请你共同编辑文档《Flink SQL写入到mysql的问题》 我创建了一个语雀,我将代码以及问题都写在文档里了,麻烦大佬们帮忙看一下问题呀 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月24日 13:08 | | 收件人 | | | 主题 | Re: flink写入mysql数据异常 | Hi 你可以将问题描述和sql放在一个外部文档,例如google文档,然后将文档连接发在邮件里 Best, Shammon FY On Fri, Mar 24, 2023 at 10:58 AM 孙冬燕 wrote: 退订 -- 发件人:小昌同学 发送时间:2023年3月24日(星期五) 10:57 收件人:user-zh 抄 送:user-zh 主 题:回复: flink写入mysql数据异常 您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确; 您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下; 谢谢大佬的指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年3月23日 20:40 | | 收件人 | | | 主题 | Re: flink写入mysql数据异常 | 附件还是没有收到哦. Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [, column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1] [1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries < https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries On Thu, Mar 23, 2023 at 5:35 PM 小昌同学 wrote: 您好,我刚刚重新上传了附件;是的,Flink SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导 小昌同学 ccc0606fight...@163.com < https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D> < https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D> 回复的原邮件 发件人 Jane Chan 发送日期 2023年3月23日 15:42 收件人 主题 Re: flink写入mysql数据异常 Hi, 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键, 并且与数据库中物理表主键保持一致. 可以参考 [1]. [1] https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86 < https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86 On Thu, Mar 23, 2023 at 2:54 PM 小昌同学 wrote: 大佬,你好,代码上传在附件中了; 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法? 小昌同学 ccc0606fight...@163.com < https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D < https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D 回复的原邮件 发件人 Jane Chan 发送日期 2023年3月23日 14:23 收件人 主题 Re: flink写入mysql数据异常 可以把完整 SQL 发出来看看 祝好! Jane On Thu, Mar 23, 2023 at 1:39 PM 小昌同学 wrote: 使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊 | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink写入mysql数据异常
您好, 可能是我这边上传附件的方式不对,我场景描述的不够准确; 您看是否方便加一个微信呢【15956076613】,我将文档和截图发您,帮忙看一下; 谢谢大佬的指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年3月23日 20:40 | | 收件人 | | | 主题 | Re: flink写入mysql数据异常 | 附件还是没有收到哦. Flink SQL 支持 INSERT INTO table_identifier (column_identifier1 [, column_identifier2, ...]) 插入指定列, 具体语法可以参考 [1] [1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/insert/#insert-from-select-queries On Thu, Mar 23, 2023 at 5:35 PM 小昌同学 wrote: 您好,我刚刚重新上传了附件;是的,Flink SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导 小昌同学 ccc0606fight...@163.com <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D> 回复的原邮件 发件人 Jane Chan 发送日期 2023年3月23日 15:42 收件人 主题 Re: flink写入mysql数据异常 Hi, 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键, 并且与数据库中物理表主键保持一致. 可以参考 [1]. [1] https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86 On Thu, Mar 23, 2023 at 2:54 PM 小昌同学 wrote: 大佬,你好,代码上传在附件中了; 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法? 小昌同学 ccc0606fight...@163.com < https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D 回复的原邮件 发件人 Jane Chan 发送日期 2023年3月23日 14:23 收件人 主题 Re: flink写入mysql数据异常 可以把完整 SQL 发出来看看 祝好! Jane On Thu, Mar 23, 2023 at 1:39 PM 小昌同学 wrote: 使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊 | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink写入mysql数据异常
您好,我刚刚重新上传了附件;是的,Flink SQL已经支持了Upsert模式,但是这种更新都是行级别的更新,我想要实现仅仅只是变动一行数据中的部分字段。还望大佬指导 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年3月23日 15:42 | | 收件人 | | | 主题 | Re: flink写入mysql数据异常 | Hi, 没有看到附件哦. 回到你的问题, Flink SQL 目前支持以 Upsert 模式写入 MySQL, 前提是 Sink 表的 DDL 声明主键, 并且与数据库中物理表主键保持一致. 可以参考 [1]. [1] https://github.com/apache/flink-connector-jdbc/blob/main/docs/content.zh/docs/connectors/table/jdbc.md#%E9%94%AE%E5%A4%84%E7%90%86 On Thu, Mar 23, 2023 at 2:54 PM 小昌同学 wrote: 大佬,你好,代码上传在附件中了; 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法? 小昌同学 ccc0606fight...@163.com <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D> 回复的原邮件 发件人 Jane Chan 发送日期 2023年3月23日 14:23 收件人 主题 Re: flink写入mysql数据异常 可以把完整 SQL 发出来看看 祝好! Jane On Thu, Mar 23, 2023 at 1:39 PM 小昌同学 wrote: 使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊 | | 小昌同学 | | ccc0606fight...@163.com |
回复: flink写入mysql数据异常
大佬,你好,代码上传在附件中了; 就是我想实现flink sql写MySQL时能支持update吗 类似ON DUPLICATE KEY UPDATE 的语法? | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年3月23日 14:23 | | 收件人 | | | 主题 | Re: flink写入mysql数据异常 | 可以把完整 SQL 发出来看看 祝好! Jane On Thu, Mar 23, 2023 at 1:39 PM 小昌同学 wrote: 使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊 | | 小昌同学 | | ccc0606fight...@163.com |
flink写入mysql数据异常
使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert ,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊 | | 小昌同学 | | ccc0606fight...@163.com |
Re: flink sql
好滴 谢谢大佬呀 | | 小昌同学 | | ccc0606fight...@163.com | Replied Message | From | 17610775726<17610775...@163.com> | | Date | 3/3/2023 15:55 | | To | user-zh@flink.apache.org | | Cc | user-zh | | Subject | Re:flink sql | Hi 可以通过设置 pipeline.operator-chaining = false 来实现。 Best JasonLee Replied Message | From | 小昌同学 | | Date | 03/3/2023 15:50 | | To | user-zh | | Subject | flink sql | 各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |
flink sql
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |
flink状态恢复
我有一个flink任务运行很久了,统计的指标是一些聚合值,但是现在业务想要增加字段,请问一下这个场景大家是怎么处理的啊 | | 小昌同学 | | ccc0606fight...@163.com |
回复: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
截图一下日志报错的exception看看 | | 小昌 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | yidan zhao | | 发送日期 | 2022年9月16日 14:20 | | 收件人 | user-zh | | 主题 | Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务 | 什么部署模式。 Summer 于2022年9月16日周五 13:57写道: Flink版本:1.13.3 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。 请问有什么办法取消这个任务。
回复:flink table API使用
感谢感谢大佬指点 | | 应聘者昌呈呈 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Xuyang | | 发送日期 | 2022年9月6日 00:03 | | 收件人 | | | 主题 | Re:flink table API使用 | Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1] [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java -- Best! Xuyang 在 2022-09-05 20:53:03,"小昌同学" 写道: Table result = kafka_item.groupBy($("trans_number")) .select($("trans_number"),$("sales_amount").sum().as("sum_amount")) .filter($("sum_amount ")); 各位大佬 请教一个问题 我这边想通过flink table API 达到这样一个效果: 根据trans_number进行分组 然后对另一个字段进行sum计算 然后我想最后进行过滤的时候 过滤出来这个sum值大于100的 我这个后续怎么使用API啊 这个filter算子咋用呀 | | 小昌 | | ccc0606fight...@163.com |
flink table API使用
Table result = kafka_item.groupBy($("trans_number")) .select($("trans_number"),$("sales_amount").sum().as("sum_amount")) .filter($("sum_amount ")); 各位大佬 请教一个问题 我这边想通过flink table API 达到这样一个效果: 根据trans_number进行分组 然后对另一个字段进行sum计算 然后我想最后进行过滤的时候 过滤出来这个sum值大于100的 我这个后续怎么使用API啊 这个filter算子咋用呀 | | 小昌 | | ccc0606fight...@163.com |
flink sql解析kafka数据
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候 发现识别不到这个字段 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛 CREATE TABLE ccc_test_20220630_2 ( trans_number STRING, end_timestamp STRING, return_flagSTRING, commodity_type STRING ) COMMENT '中台交易流水小票头' WITH ( 'connector' = 'kafka', 'topic' = 'yh_rme_soc_stream_prod-tlog_header', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'ccc_test_20220630_2', 'properties.request.timeout.ms' = '6', 'format' = 'json', 'scan.startup.mode' = 'group-offsets', -- 'scan.startup.mode' = 'timestamp', -- 'scan.startup.timestamp-millis' = '165373920', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' 'json.infer-schema.flatten-nested-columns.enable'='true' ); | | 小昌 | | ccc0606fight...@163.com |
Unaligned Checkpoint
大佬们可以说说Unaligned Checkpoint的实现吗 看了不少文档 没有太看懂 我如果想在sql里面实现 这个该怎么设置啊 请大佬们指教 | | 小昌同学 | | ccc0606fight...@163.com |