非常感谢回复 1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner join应该也会受这样的影响 3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
lxk7...@163.com 发件人: Shengkai Fang 发送时间: 2022-06-11 20:35 收件人: user-zh 主题: Re: Re: Flink 使用interval join数据丢失疑问 hi, 对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join 算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢? 对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流 11:00之前的数据都可以被清理了。 对于第三点,我觉得是不能的。目前的 inner join + state 清理无法覆盖 event time 的window join 的。 best, Shengkai lxk7...@163.com <lxk7...@163.com> 于2022年6月10日周五 23:03写道: > 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下: > > 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。 > 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。) > 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。 > 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。 > 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。 > 从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval > join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因 > > 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner > join其实是一个window join吗? > > > > lxk7...@163.com > > 发件人: lxk > 发送时间: 2022-06-10 18:18 > 收件人: user-zh > 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问 > > > > 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题 > > > > > Table headerTable = > streamTableEnvironment.fromDataStream(headerFilterStream, > Schema.newBuilder() > .columnByExpression("rowtime", > "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") > .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") > .build()); > Table itemTable = > streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder() > .columnByExpression("rowtime", > "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") > .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") > .build()); > > > > > streamTableEnvironment.createTemporaryView("header",headerTable); > streamTableEnvironment.createTemporaryView("item",itemTable); > > > > > > > Table result = streamTableEnvironment.sqlQuery("select > header.customer_id" + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id and > item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND > header.rowtime + INTERVAL '20' SECOND"); > > > > > String intervalJoin = streamTableEnvironment.explainSql("select > header.customer_id" + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id and > item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND > header.rowtime + INTERVAL '20' SECOND"); > > > System.out.println(intervalJoin); > > > DataStream<Row> rowDataStream = > streamTableEnvironment.toChangelogStream(result); > > > > > > > 执行计划: > == Abstract Syntax Tree == > LogicalProject(customer_id=[$2], goods_id=[$16], id=[$0], > order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], > pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$13], row_num=[$15], > p_sp_sub_amt=[$20], display_qty=[$23], qty=[$18], bom_type=[$21]) > +- LogicalJoin(condition=[AND(=($0, $14), >=($25, -($12, 10000:INTERVAL > SECOND)), <=($25, +($12, 20000:INTERVAL SECOND)))], joinType=[inner]) > :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, > 2000:INTERVAL SECOND)]) > : +- LogicalProject(id=[$0], order_status=[$1], customer_id=[$2], > shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], > channel_id=[$7], root_order_id=[$8], last_updated_at=[$9], > business_flag=[$10], mysql_op_type=[$11], rowtime=[CAST(SUBSTRING($9, 0, > 19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]]) > +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, > 2000:INTERVAL SECOND)]) > +- LogicalProject(id=[$0], order_id=[$1], row_num=[$2], > goods_id=[$3], s_sku_code=[$4], qty=[$5], p_paid_sub_amt=[$6], > p_sp_sub_amt=[$7], bom_type=[$8], last_updated_at=[$9], display_qty=[$10], > is_first_flag=[$11], rowtime=[CAST(SUBSTRING($9, 0, > 19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) > +- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]]) > > > == Optimized Physical Plan == > Calc(select=[customer_id, goods_id, id, order_status, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, > p_sp_sub_amt, display_qty, qty, bom_type]) > +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, > leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9, > rightTimeIndex=8], where=[AND(=(id, order_id), >=(rowtime0, -(rowtime, > 10000:INTERVAL SECOND)), <=(rowtime0, +(rowtime, 20000:INTERVAL SECOND)))], > select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, > pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, > goods_id, qty, p_sp_sub_amt, bom_type, display_qty, rowtime0]) > :- Exchange(distribution=[hash[id]]) > : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, > 2000:INTERVAL SECOND)]) > : +- Calc(select=[id, order_status, customer_id, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, > CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, > 2000:INTERVAL SECOND)]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, > 19)) AS rowtime]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, > goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, > last_updated_at, display_qty, is_first_flag]) > > > == Optimized Execution Plan == > Calc(select=[customer_id, goods_id, id, order_status, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, > p_sp_sub_amt, display_qty, qty, bom_type]) > +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, > leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9, > rightTimeIndex=8], where=[((id = order_id) AND (rowtime0 >= (rowtime - > 10000:INTERVAL SECOND)) AND (rowtime0 <= (rowtime + 20000:INTERVAL > SECOND)))], select=[id, order_status, customer_id, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, rowtime, id0, > order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, > rowtime0]) > :- Exchange(distribution=[hash[id]]) > : +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - > 2000:INTERVAL SECOND)]) > : +- Calc(select=[id, order_status, customer_id, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, > CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - > 2000:INTERVAL SECOND)]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, > 19)) AS rowtime]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, > goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, > last_updated_at, display_qty, is_first_flag]) > > > > > > > > > > > > > > > > > 在 2022-06-10 17:16:31,"lxk" <lxk7...@163.com> 写道: > >使用sql 进行interval > join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10 > 13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2022-06-10 15:04:31,"Xuyang" <xyzhong...@163.com> 写道: > >>Hi, datastream的这个interval join的api应该对标的是sql中的interval > join。但是你目前写的这个sql,是普通join。普通join和interval > join在业务含义和实现上都是有区别的。所以你直接拿datastream api的interval > join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval join,这样双方才有可比性。 > >> > >> > > >>另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 > >> > >> > >> > >> > >>-- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> > >> > >>在 2022-06-10 14:33:37,"lxk" <lxk7...@163.com> 写道: > >>> > >>> > >>> > >>>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 > 20s,为什么会丢数据? > >>> > >>>sql中我设置这个table.exec.state.ttl参数 > 为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>>在 2022-06-10 14:15:29,"Xuyang" <xyzhong...@163.com> 写道: > >>>>Hi, 你的这条SQL 并不是interval join,是普通join。 > >>>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval > join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 > >>>> > >>>> > >>>> > >>>> > >>>>[1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>>-- > >>>> > >>>> Best! > >>>> Xuyang > >>>> > >>>> > >>>> > >>>> > >>>> > >>>>在 2022-06-10 11:26:33,"lxk" <lxk7...@163.com> 写道: > >>>>>我用的是以下代码: > >>>>>String s = streamTableEnvironment.explainSql("select > header.customer_id" + > >>>>>",item.goods_id" + > >>>>>",header.id" + > >>>>>",header.order_status" + > >>>>>",header.shop_id" + > >>>>>",header.parent_order_id" + > >>>>>",header.order_at" + > >>>>>",header.pay_at" + > >>>>>",header.channel_id" + > >>>>>",header.root_order_id" + > >>>>>",item.id" + > >>>>>",item.row_num" + > >>>>>",item.p_sp_sub_amt" + > >>>>>",item.display_qty" + > >>>>>",item.qty" + > >>>>>",item.bom_type" + > >>>>>" from header JOIN item on header.id = item.order_id"); > >>>>> > >>>>>System.out.println("explain:" + s); > >>>>> > >>>>> > >>>>> > >>>>> > >>>>>plan信息为: > >>>>>explain:== Abstract Syntax Tree == > >>>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], > order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], > pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], > p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) > >>>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) > >>>>> :- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]]) > >>>>> +- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]]) > >>>>> > >>>>> > >>>>>== Optimized Physical Plan == > >>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, > p_sp_sub_amt, display_qty, qty, bom_type]) > >>>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, > order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, > channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) > >>>>> :- Exchange(distribution=[hash[id]]) > >>>>> : +- Calc(select=[id, order_status, customer_id, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id]) > >>>>> : +- TableSourceScan(table=[[default_catalog, > default_database, Unregistered_DataStream_Source_5]], fields=[id, > order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, > channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) > >>>>> +- Exchange(distribution=[hash[order_id]]) > >>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty]) > >>>>> +- TableSourceScan(table=[[default_catalog, > default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, > row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, > last_updated_at, display_qty, is_first_flag]) > >>>>> > >>>>> > >>>>>== Optimized Execution Plan == > >>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, > p_sp_sub_amt, display_qty, qty, bom_type]) > >>>>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, > order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, > channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], > rightInputSpec=[NoUniqueKey]) > >>>>> :- Exchange(distribution=[hash[id]]) > >>>>> : +- Calc(select=[id, order_status, customer_id, shop_id, > parent_order_id, order_at, pay_at, channel_id, root_order_id]) > >>>>> : +- TableSourceScan(table=[[default_catalog, > default_database, Unregistered_DataStream_Source_5]], fields=[id, > order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, > channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) > >>>>> +- Exchange(distribution=[hash[order_id]]) > >>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, > p_sp_sub_amt, bom_type, display_qty]) > >>>>> +- TableSourceScan(table=[[default_catalog, > default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, > row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, > last_updated_at, display_qty, is_first_flag]) > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>>在 2022-06-10 11:02:56,"Shengkai Fang" <fskm...@gmail.com> 写道: > >>>>>>你好,能提供下具体的 plan 供大家查看下吗? > >>>>>> > >>>>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN > >>>>>><YOUR_QUERY>").print() 打印下相关的信息。 > >>>>>> > >>>>>>Best, > >>>>>>Shengkai > >>>>>> > >>>>>>lxk <lxk7...@163.com> 于2022年6月10日周五 10:29写道: > >>>>>> > >>>>>>> flink 版本:1.14.4 > >>>>>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval > >>>>>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 > >>>>>>> 水印是直接使用kafka 自带的时间戳生成watermark > >>>>>>> > >>>>>>> > >>>>>>> 以下是代码 ---interval join > >>>>>>> > >>>>>>> SingleOutputStreamOperator<HeaderFull> headerFullStream = > >>>>>>> headerFilterStream.keyBy(data -> data.getId()) > >>>>>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) > >>>>>>> .between(Time.seconds(-10), Time.seconds(20)) > >>>>>>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, > HeaderFull>() { > >>>>>>> @Override > >>>>>>> public void processElement(OrderHeader left, OrderItem right, > Context > >>>>>>> context, Collector<HeaderFull> collector) throws Exception { > >>>>>>> HeaderFull headerFull = new HeaderFull(); > >>>>>>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); > >>>>>>> beanUtilsBean.copyProperties(headerFull, left); > >>>>>>> beanUtilsBean.copyProperties(headerFull, right); > >>>>>>> String event_date = left.getOrder_at().substring(0, 10); > >>>>>>> headerFull.setEvent_date(event_date); > >>>>>>> headerFull.setItem_id(right.getId()); > >>>>>>> collector.collect(headerFull); > >>>>>>> } > >>>>>>> } > >>>>>>> 使用sql 进行join > >>>>>>> Configuration conf = new Configuration(); > >>>>>>> conf.setString("table.exec.mini-batch.enabled","true"); > >>>>>>> conf.setString("table.exec.mini-batch.allow-latency","15 s"); > >>>>>>> conf.setString("table.exec.mini-batch.size","100"); > >>>>>>> conf.setString("table.exec.state.ttl","20 s"); > >>>>>>> env.configure(conf); > >>>>>>> Table headerTable = > >>>>>>> streamTableEnvironment.fromDataStream(headerFilterStream); > >>>>>>> Table itemTable = > streamTableEnvironment.fromDataStream(filterItemStream); > >>>>>>> > >>>>>>> > >>>>>>> streamTableEnvironment.createTemporaryView("header",headerTable); > >>>>>>> streamTableEnvironment.createTemporaryView("item",itemTable); > >>>>>>> > >>>>>>> Table result = streamTableEnvironment.sqlQuery("select > header.customer_id" > >>>>>>> + > >>>>>>> ",item.goods_id" + > >>>>>>> ",header.id" + > >>>>>>> ",header.order_status" + > >>>>>>> ",header.shop_id" + > >>>>>>> ",header.parent_order_id" + > >>>>>>> ",header.order_at" + > >>>>>>> ",header.pay_at" + > >>>>>>> ",header.channel_id" + > >>>>>>> ",header.root_order_id" + > >>>>>>> ",item.id" + > >>>>>>> ",item.row_num" + > >>>>>>> ",item.p_sp_sub_amt" + > >>>>>>> ",item.display_qty" + > >>>>>>> ",item.qty" + > >>>>>>> ",item.bom_type" + > >>>>>>> " from header JOIN item on header.id = item.order_id"); > >>>>>>> > >>>>>>> > >>>>>>> DataStream<Row> rowDataStream = > >>>>>>> streamTableEnvironment.toChangelogStream(result); > >>>>>>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval > >>>>>>> join,为啥两者最终关联上的结果差异这么大。 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> >