t;xiao cai" 写道:
>
> >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
> >这是我很困惑的地方。
> >
> >
> > 原始邮件
> >发件人: lec ssmi
> >收件人: flink-user-cn
> >发送时间: 2020年9月27日(周日) 13:06
> >主题: Re: Flink SQL撤回流问题
> &
是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。
xiao cai 于2020年9月27日周日 下午12:08写道:
> 场景如下:
> source table: kafka
> sink table: mysql schem(id, dt, cnt)
>
>
> insert :
> insert into sink
> select dt,count(distinct id) from source group by dt;
>
>
> 这时mysql对应sink表中有一条数据(1, 2020-0
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。
Benchao Li 于2020年9月10日周四 下午3:26写道:
> sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
> 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
>
> last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
>
> lec
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
感觉底层和 last_value() group by id是一样的。
Benchao Li 于2020年9月10日周四 上午10:34写道:
>
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> 如果还有自己的binlog格式,也可以自定义format来实现。
>
> 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可
直接根据订单的id进行retract(使用last_value group by id
),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。
忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:
> 请问第1点是有实际的案例使用了么?
> 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> 谢谢.
>
>
>
>
> --
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level
api。是这个意思吧。
op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道:
>
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
>
>
> -- 原始邮件 --
> 发件人:
>
使用自定义的Table Sink就可以了啊.
Luan Cooper 于2020年5月7日周四 下午8:39写道:
> Hi
>
> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL
> 如下
>
> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> SELECT *
> FROM binlog // mysql 表的 binlog
>
> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
>
但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。
override def needsUpdatesAsRetraction = true
override def consumesRetractions = true
Benchao Li 于2020年4月28日周二 上午10:19写道:
> Hi lec,
>
> Window Operator目前是不支持retract的输入的。
>
> lec ssmi 于2020年4月28日周二 上午9:45写道:
>
> &g
Hi:
在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
举个例子,
假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time
window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
window的时候,因为超过了
er这种,不涉及到时间的概念,不会丢弃数据的。
>
> lec ssmi 于2020年4月27日周一 下午5:38写道:
>
> > Hi:
> > 如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
> > 也会自动过滤掉而不处理?
> > 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
> >
>
>
> --
>
Hi:
如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
也会自动过滤掉而不处理?
感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
Hi:
将TwoPhaseCommitSinkFunction 在TableAPI中的Sink实现,能够生效吗?看案例都是DataStream
API在使用。
谢谢。
Hi:
假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。
因为需要利用聚合算子能够自动处理retract的特性,所以需要将kafka的结果封装成带有不同header的row,即组装为INSERT
DELETE UPDATE类型的数据。
有什么办法可以解决吗?
如果将上下游程序合在一起 ,是没问题的,现在的难点就是拆分。
谢谢。
不好意思,刚才看了一下源码:
[image: image.png]
这个是over window的聚合操作。
这个类实现没有实现producesUpdates 和producesRetractions,
而这两个方法的默认值都是False。是否说明,只能有INSERT类型的记录?
如果是的话,不就是说明over window操作的输出是一个Append-only stream?
lec ssmi 于2020年4月23日周四 下午5:13写道:
> 明白了,谢谢。
>
> Benchao Li 于2020年4月23日周四 下午5:08写道:
>
>&g
明白了,谢谢。
Benchao Li 于2020年4月23日周四 下午5:08写道:
> 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。
> 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。
>
> lec ssmi 于2020年4月23日周四 下午5:00写道:
>
> > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。
> >
> >
nction对于消息的类型是不需要判断的,处理过程都是一样的。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17343
>
> lec ssmi 于2020年4月23日周四 下午4:41写道:
>
> > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话,
> > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于
> > if( type='DELETE'
Benchao Li 于2020年4月23日周四 下午4:33写道:
> 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。
>
> lec ssmi 于2020年4月23日周四 下午4:29写道:
>
> > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
> > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
> >
> > Benchao Li 于20
奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
Benchao Li 于2020年4月23日周四 下午4:26写道:
> time interval join不允许输入是非append的。
>
>
> lec ssmi 于2020年4月23日周四 下午4:18写道:
>
> > 那如果是两个retract算子后的流进行time interval join,
> > 已经j
t; group by也需要。
>
> lec ssmi 于2020年4月23日周四 下午4:05写道:
>
> > 谢谢。
> > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > 但是对于Table
> > API&SQL来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > 我在编写UDAF的时候,里面有个retract方法,注释写的是
我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
Benchao Li 于2020年4月23日周四 下午3:59写道:
> 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
>
> lec ssmi 于2020年4月23日周四 下午3:45写道:
>
> > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一
gt; 还有些算子本身不会产生,但是会传递,比如calc算子
>
> 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
>
> 这个也不绝对。大部分时候是。
> 这个取决于这个算子本身是不是会consume
> retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
>
> 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
>
>
Hi:
有几个问题想咨询下大佬:
1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
那还不如直接用apache beam直接将这些框架的API全部统一起来。
hsdcl...@163.com 于2020年4月22日周三 上午11:27写道:
> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spark就像Scala一样
>
>
>
>
> hsdcl...@163.com
>
> 发件人: Jeff Zhang
> 发送时间: 2020-04-22 10:52
> 收件人: user-zh
> 主题: Re: spark代码直接运行至Flink平台
> 啥目的 ?
>
> hsdcl
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的?
taowang 于2020年4月17日周五 上午10:46写道:
> 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。
>
>
> 原始邮件
> 发件人: lec ssmi
> 收件人: flink-user-cn
> 发送时间: 2020年4月17日(周五) 09:25
> 主题: Re: 为消息分配
请问,你对DataStream重新声明时间列和水印,生效吗?
taowang 于2020年4月16日周四 下午10:49写道:
> 嗯嗯,还是十分感谢。
> 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。
>
>
> 打扰各位了,祝好!~
>
>
> 原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 22:39
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 正在载入邮件原文…
下的sql window join, 那么下游收到的
> watermark是较小一侧表的watermark - sql设定的时间窗口.
>
> lec ssmi 于2020年4月8日周三 下午2:05写道:
>
> > 大家好:
> > 请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
> > 比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
> > 另外,稍微
大家好:
请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。
主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。
在实际生产中,很多时候需要将流和维表join,维表可能来自于hive或者mysql。这种情况下,类似于spark structured
streaming中,可以直接使用一套API将流和批之间join。并且如果维表的数据改变,计算用的数据也会随着改变。而目前FLink底层虽然采用了相同的思维来实现批和流,但是批流两种API没有统一,要想实现维表join的途径比较麻烦。社区为啥不考虑下将这种特性封装下呢?
28 matches
Mail list logo