Re: Re: Flink SQL撤回流问题

2020-09-26 文章 lec ssmi
t;xiao cai" 写道: > > >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: Re: Flink SQL撤回流问题 > &

Re: Flink SQL撤回流问题

2020-09-26 文章 lec ssmi
是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 2020-0

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 lec ssmi
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。 Benchao Li 于2020年9月10日周四 下午3:26写道: > sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。 > 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 > > last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 > > lec

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 lec ssmi
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? 感觉底层和 last_value() group by id是一样的。 Benchao Li 于2020年9月10日周四 上午10:34写道: > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > 如果还有自己的binlog格式,也可以自定义format来实现。 > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 lec ssmi
直接根据订单的id进行retract(使用last_value group by id ),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > --

Re: FlinkKafkaConsumer问题

2020-09-03 文章 lec ssmi
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level api。是这个意思吧。 op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?  > > > -- 原始邮件 -- > 发件人: >

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 文章 lec ssmi
使用自定义的Table Sink就可以了啊. Luan Cooper 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 >

Re: 关于撤回流的Watermark问题

2020-04-27 文章 lec ssmi
但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。 override def needsUpdatesAsRetraction = true override def consumesRetractions = true Benchao Li 于2020年4月28日周二 上午10:19写道: > Hi lec, > > Window Operator目前是不支持retract的输入的。 > > lec ssmi 于2020年4月28日周二 上午9:45写道: > > &g

关于撤回流的Watermark问题

2020-04-27 文章 lec ssmi
Hi: 在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢? 举个例子, 假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time window的时候,因为超过了

Re: 晚于watermark的数据何时被抛弃

2020-04-27 文章 lec ssmi
er这种,不涉及到时间的概念,不会丢弃数据的。 > > lec ssmi 于2020年4月27日周一 下午5:38写道: > > > Hi: > > 如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join > > 也会自动过滤掉而不处理? > > 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。 > > > > > -- >

晚于watermark的数据何时被抛弃

2020-04-27 文章 lec ssmi
Hi: 如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join 也会自动过滤掉而不处理? 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。

user-zh@flink.apache.org

2020-04-26 文章 lec ssmi
Hi: 将TwoPhaseCommitSinkFunction 在TableAPI中的Sink实现,能够生效吗?看案例都是DataStream API在使用。 谢谢。

人为生成retract记录

2020-04-25 文章 lec ssmi
Hi: 假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。 因为需要利用聚合算子能够自动处理retract的特性,所以需要将kafka的结果封装成带有不同header的row,即组装为INSERT DELETE UPDATE类型的数据。 有什么办法可以解决吗? 如果将上下游程序合在一起 ,是没问题的,现在的难点就是拆分。 谢谢。

Re: retract的问题

2020-04-23 文章 lec ssmi
不好意思,刚才看了一下源码: [image: image.png] 这个是over window的聚合操作。 这个类实现没有实现producesUpdates 和producesRetractions, 而这两个方法的默认值都是False。是否说明,只能有INSERT类型的记录? 如果是的话,不就是说明over window操作的输出是一个Append-only stream? lec ssmi 于2020年4月23日周四 下午5:13写道: > 明白了,谢谢。 > > Benchao Li 于2020年4月23日周四 下午5:08写道: > >&g

Re: retract的问题

2020-04-23 文章 lec ssmi
明白了,谢谢。 Benchao Li 于2020年4月23日周四 下午5:08写道: > 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。 > 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。 > > lec ssmi 于2020年4月23日周四 下午5:00写道: > > > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。 > > > >

Re: retract的问题

2020-04-23 文章 lec ssmi
nction对于消息的类型是不需要判断的,处理过程都是一样的。 > > [1] https://issues.apache.org/jira/browse/FLINK-17343 > > lec ssmi 于2020年4月23日周四 下午4:41写道: > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话, > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于 > > if( type='DELETE'

Re: retract的问题

2020-04-23 文章 lec ssmi
Benchao Li 于2020年4月23日周四 下午4:33写道: > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。 > > lec ssmi 于2020年4月23日周四 下午4:29写道: > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 > > > > Benchao Li 于20

Re: retract的问题

2020-04-23 文章 lec ssmi
奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 Benchao Li 于2020年4月23日周四 下午4:26写道: > time interval join不允许输入是非append的。 > > > lec ssmi 于2020年4月23日周四 下午4:18写道: > > > 那如果是两个retract算子后的流进行time interval join, > > 已经j

Re: retract的问题

2020-04-23 文章 lec ssmi
t; group by也需要。 > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > 谢谢。 > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > 但是对于Table > > API&SQL来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > 我在编写UDAF的时候,里面有个retract方法,注释写的是

Re: retract的问题

2020-04-23 文章 lec ssmi
我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 Benchao Li 于2020年4月23日周四 下午3:59写道: > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一

Re: retract的问题

2020-04-23 文章 lec ssmi
gt; 还有些算子本身不会产生,但是会传递,比如calc算子 > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > 这个也不绝对。大部分时候是。 > 这个取决于这个算子本身是不是会consume > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > >

retract的问题

2020-04-23 文章 lec ssmi
Hi: 有几个问题想咨询下大佬: 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?

Re: Re: spark代码直接运行至Flink平台

2020-04-21 文章 lec ssmi
那还不如直接用apache beam直接将这些框架的API全部统一起来。 hsdcl...@163.com 于2020年4月22日周三 上午11:27写道: > 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java, > spark就像Scala一样 > > > > > hsdcl...@163.com > > 发件人: Jeff Zhang > 发送时间: 2020-04-22 10:52 > 收件人: user-zh > 主题: Re: spark代码直接运行至Flink平台 > 啥目的 ? > > hsdcl

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 lec ssmi
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? taowang 于2020年4月17日周五 上午10:46写道: > 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: lec ssmi > 收件人: flink-user-cn > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 为消息分配

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 lec ssmi
请问,你对DataStream重新声明时间列和水印,生效吗? taowang 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 原始邮件 > 发件人: tison > 收件人: user-zh > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…

Re: 重复声明watermark的问题

2020-04-10 文章 lec ssmi
下的sql window join, 那么下游收到的 > watermark是较小一侧表的watermark - sql设定的时间窗口. > > lec ssmi 于2020年4月8日周三 下午2:05写道: > > > 大家好: > > 请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗? > > 比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。 > > 另外,稍微

重复声明watermark的问题

2020-04-07 文章 lec ssmi
大家好: 请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗? 比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。 另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。 主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。

批流join特性

2020-03-19 文章 lec ssmi
在实际生产中,很多时候需要将流和维表join,维表可能来自于hive或者mysql。这种情况下,类似于spark structured streaming中,可以直接使用一套API将流和批之间join。并且如果维表的数据改变,计算用的数据也会随着改变。而目前FLink底层虽然采用了相同的思维来实现批和流,但是批流两种API没有统一,要想实现维表join的途径比较麻烦。社区为啥不考虑下将这种特性封装下呢?