Flink写入CK数据丢失问题

2022-06-01 Thread lxk
各位,请教个问题 目前使用flink往ck写入数据,使用的是datastream api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s. 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小

Re:Flink写入CK数据丢失问题

2022-06-01 Thread lxk
补充一下图片 https://s2.loli.net/2022/06/02/C5it4rPFgmJlopZ.png https://s2.loli.net/2022/06/02/3ri2HIv1RsAawBW.png https://s2.loli.net/2022/06/02/efVWPvXCYFwhgTp.png https://s2.loli.net/2022/06/02/9UptbNaWvs7xwXC.png 在 2022-06-02 11:38:24,"lxk" 写道: 各位,请教个问题 目前使用flink往c

Flink 使用interval join数据丢失疑问

2022-06-09 Thread lxk
flink 版本:1.14.4 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 水印是直接使用kafka 自带的时间戳生成watermark 以下是代码 ---interval join SingleOutputStreamOperator headerFullStream = headerFilterStream.keyBy(data -> data.getId()) .intervalJoin(filterItemSt

Re:Re: Flink 使用interval join数据丢失疑问

2022-06-09 Thread lxk
amt, bom_type, last_updated_at, display_qty, is_first_flag]) 在 2022-06-10 11:02:56,"Shengkai Fang" 写道: >你好,能提供下具体的 plan 供大家查看下吗? > >你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >").print() 打印下相关的信息。 > >Best, >Shengkai > &

Re:Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-09 Thread lxk
考文档[1]。可以试下使用SQL interval >join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 > > > > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins > > > > > > > >-- > >Best! >Xuy

Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-10 Thread lxk
义和实现上都是有区别的。所以你直接拿datastream >api的interval join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval >join,这样双方才有可比性。 > > >另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 > > > > >-- > >Best! >Xuyang > >

Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-10 Thread lxk
markAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_data

Re:Flink JobManager 节点 JVM Metaspace 过高

2022-06-10 Thread lxk
可以把堆栈的日志打印出来看看 在 2022-06-10 18:15:53,"Summer" 写道: 使用 FinkUI 上传 Flink 任务 Jar 时,任务启动失败。 这时候JVM Metaspace就会异常增加。 这是什么原因?

Re:Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 Thread lxk
me,也就是说,如果右流的 event >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流 >>11:00之前的数据都可以被清理了。 >> >>对于第三点,我觉得是不能的。目前的 inner join + state 清理无法覆盖 event time 的window join 的。 >> >>best, >>Shengkai >> >>lxk7...@163.com 于2022年6

Re:Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-15 Thread lxk
; 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key >> 的数据频繁访问情况下,那么这个数据就不会过期。 >> >> > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。 >> >> 我记得日志是会打印相关的日志。能提一些相关的日志吗? >> >> best, >> Shengkai >> >> lxk 于2022年6月14日周二 20:04写道

Re:使用join+聚合时,checkpoint异常

2022-06-20 Thread lxk
你好,图片挂了,可以尝试使用图床工具上传图片。 在 2022-06-21 09:42:54,"amber_...@qq.com.INVALID" 写道: 您好! 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务; 当我提交普通数据同步任务时,一切正常; 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed Memory使用率始终是100%; 以下是我的checkpoint配置:

Flink interval join 水印疑问

2022-07-05 Thread lxk
在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png 我的问题是: 1.这里的timestamp和watermark有什么区别? 2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的time

Re:Re: Flink interval join 水印疑问

2022-07-07 Thread lxk
>watermark是从source部分生成的水印个,然后向后传播。 > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > >lxk 于2022年7月6日周三 13:36写道: >> >> 在使用int

Re:Re: 如何在flink中正确使用外部数据库连接

2022-07-25 Thread lxk
谢谢 我现在使用的是直连的方式,也没有关闭preparedstatement和resultset,但是没有发生过内存泄漏的问题,请问了解这背后的原因吗 在 2022-07-25 13:53:42,"Lijie Wang" 写道: >Hi, >根据我的经验,使用连接池时,至少需要及时关掉 statement/ResultSet,否则查询的结果会一直缓存,会有内存泄漏的问题。 > >Best, >Lijie > >lxk7...@163.com 于2022年7月23日周六 15:34写道: > >> >> 目前的项目中,需要使用外部数据库进行实时的look

Flink SQL 如何优化以及处理反压

2023-01-30 Thread lxk
Flink版本:1.16.0 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 具体代码如下: select \ header.id as id, \ LAST_VALUE(header.order_status), \ LAST_VALUE(header.customer_id), \ LAST_VALUE(header.shop_id), \ LAST_VALUE(header.parent_order_id), \ LAST_VALUE(header.order_at), \ LAST_VALUE(header.pay_at), \ LAST_VALUE(head

Re:Re: Flink SQL 如何优化以及处理反压

2023-01-31 Thread lxk
qq.com.invalid> 于2023年1月31日周二 17:22写道: > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 >> >> >> 发件人: lxk >> 发送时间: 2023年1月31日 15:16 >> 收件人: user-zh@flink.apache.org >> 主题: Flink SQL 如何优化以及处理反压 >> >> Flink版本:1.16.0 >> 目前在使用Flink

Flink程序内存Dump不了

2023-02-13 Thread lxk
Flink version:1.16 java version: jdk1.8.0_251 问题:最近上线的Flink程序,频繁young gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format b,file=user.dump 26326 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响

Re:Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-15 Thread lxk
你好,可以dump下内存分析 在 2023-02-16 10:05:19,"Fei Han" 写道: >@all >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错: >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id >container_e506_1673750933366_49579_01_02(hdp-server-010.yigongpin.com:8041) > is n

Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-17 Thread lxk
Flink版本:1.16 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: 2023-02-17 15:03:19 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(Tra

Re:Re: Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-19 Thread lxk
gt; >从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题 > >Best, >Shammon > >On Fri, Feb 17, 2023 at 6:29 PM lxk wrote: > >> Flink版本:1.16 >> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: >> 202

Re:Re: Flink程序内存Dump不了

2023-02-19 Thread lxk
我尝试调整了参数,具体数值如下 akka.ask.timeout: 900s 但还是报同样的错 在 2023-02-17 17:32:51,"Guo Thompson" 写道: >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; > >lxk 于2023年2月14日周二 14:32写道: > >> Flink version:1.16 >> java version: jdk1.8.0_251 >&g

Re:Re: Re: Flink程序内存Dump不了

2023-02-21 Thread lxk
>> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options >> >> Best, >> Weihua >> >> >> On Mon, Feb 20, 2023 at 1:58 PM lxk wrote: >> >> > 我尝试调整了参数,具体数值如下

Flink广播流状态清理策略不生效

2023-05-14 Thread lxk
flink版本:1.14 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 在主程序中,我设置了状态过期策略: SingleOutputStreamOperator baiduStream = env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, AdvertiseClick.class)).name("BaiDuAdClick"); MapStateDescriptor baiduInfoMap = new MapStateDescriptor<>("advertiseInf

Re:Re: Flink广播流状态清理策略不生效

2023-05-14 Thread lxk
eam/fault-tolerance/state/#state-time-to-live-ttl>对 >State TTL 的描述; > >On Mon, May 15, 2023 at 11:05 AM lxk wrote: > >> flink版本:1.14 >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 >> 在主程序中,我设置了状态过期策略: >>SingleOutputStreamOperator baiduStream = >&g

Re:Re: Re: Flink广播流状态清理策略不生效

2023-05-15 Thread lxk
gt;On Mon, May 15, 2023 at 1:41 PM lxk wrote: > >> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。 >> 或者使用广播流的时候有没有什么能够手动清理状态的方法? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >&

Re:报错显示为bug

2023-05-15 Thread lxk
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 在 2023-05-15 17:11:42,"小昌同学" 写道: >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: >org.apache.flink.api.common.InvalidProgramException: Table program cannot be >compiled. This is a bug. Please file an issue. “ >flink使用版本为1.14,请问一下有相关社

Re:回复:报错显示为bug

2023-05-15 Thread lxk
thodInvocation(UnitCompiler.java:4423) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCo

Re:回复: flink 窗口触发计算的条件

2023-05-24 Thread lxk
你好,可以先看看官方文档中关于事件时间和水印的介绍 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/ 如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发 在 2023-05-25 10:00:36,"小昌同学" 写道: >是的 我发送了很多数据,发现窗口还是没有触发 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >| > 回复的原邮件 >| 发件人 | yidan

Flink使用精准一次写入kafka报错

2023-05-28 Thread lxk
项目中使用精准一次语义写入kafka,代码和配置如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.para

Flink使用精准一次写入kafka报错

2023-05-28 Thread lxk
上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下: 写入代码如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .se

Re:flink jdbcsink 连接数的问题

2023-05-29 Thread lxk
hi, jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。 关于连接数,则是跟你的并行度有关。 在 2023-05-30 13:55:57,"小昌同学" 写道: >各位老师,请教一下关于flink jdbcsink 连接数的问题; >我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; >谢谢各位老师的指导 > >| >outPutInfoStream.addSink(J

Re:Flink cep busy

2023-07-11 Thread lxk
你好,整个程序有反压吗 在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" 写道: >Hello, > 我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点 > busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka > topic > (setTopics),采用默认水位线间隔时间,我发现4个to

flink如何正确使用mybatis

2023-07-17 Thread lxk
在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下 public class MybatisUtil { private static final Logger LOGGER = LogFactory.createNewLogger("MybatisUtil"); private static ThreadLocal tl = new ThreadLocal(); private static SqlSessionFactory factory = null; //private static SqlSession sqlSession

Flink窗口状态清除疑问

2023-07-24 Thread lxk
相关配置: Flink:1.16 | Checkpointing Mode | Exactly Once | | Checkpoint Storage | FileSystemCheckpointStorage | | State Backend | EmbeddedRocksDBStateBackend | | Interval | 8m 0s | 我有一个程序,主要是用来统计一些热门商品之类的数据 具体代码如下: .keyBy(data -> data.getShopId() + data.getYh_productid()) .window(TumblingEventTimeWi

Re:RE: flink如何正确使用mybatis

2023-07-26 Thread lxk
好的,谢谢老师 在 2023-07-26 21:04:20,"Jiabao Sun" 写道: >SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。 > > >On 2023/07/18 02:13:16 lxk wrote: >> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下 >> >> public class MybatisUtil { &