Re: Re: Flink SQL撤回流问题
你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 写道: > > >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > > select dt,count(distinct id) from source group by dt; > > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" > 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >
Re: Flink SQL撤回流问题
是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 发件人: Michael Ran > 收件人: user-zh > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。 Benchao Li 于2020年9月10日周四 下午3:26写道: > sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。 > 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 > > last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 > > lec ssmi 于2020年9月10日周四 下午2:35写道: > > > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? > > 感觉底层和 last_value() group by id是一样的。 > > > > Benchao Li 于2020年9月10日周四 上午10:34写道: > > > > > > > > > > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > > > 如果还有自己的binlog格式,也可以自定义format来实现。 > > > > > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > > > 1. append / update_after 消息会累加到聚合指标上 > > > 2. delete / update_before 消息会从聚合指标上进行retract > > > > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > > > [2] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > > > > > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > > > > > > > 请问第1点是有实际的案例使用了么? > > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > > > 谢谢. > > > > > > > > > > > > > > > > > > > > --原始邮件-- > > > > 发件人: > > > > "user-zh" > > > > < > > > > libenc...@apache.org; > > > > 发送时间:2020年9月9日(星期三) 中午1:09 > > > > 收件人:"user-zh" > > > > > > > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > > > 1. 首先版本是1.11+, 可以直接用binlog > > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > > > yyy这种,那这个sum指标会自动做好这件事。 > > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > > > 将append数据流转成retract数据流,这样下游再用同样的 > > > > 聚合逻辑,效果也是一样的。 > > > > > > > > [1] > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > > > > > > > xuzh > > > > > > > 场景: > > > > nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > > > nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > > > nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka > > ,GMV实时统计为1000. > > > > nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp; > > > > 请问是不是根据 update /delete > > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > > > > > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li >
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? 感觉底层和 last_value() group by id是一样的。 Benchao Li 于2020年9月10日周四 上午10:34写道: > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > 如果还有自己的binlog格式,也可以自定义format来实现。 > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > 1. append / update_after 消息会累加到聚合指标上 > 2. delete / update_before 消息会从聚合指标上进行retract > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > > > 请问第1点是有实际的案例使用了么? > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > 谢谢. > > > > > > > > > > --原始邮件-- > > 发件人: > > "user-zh" > > < > > libenc...@apache.org; > > 发送时间:2020年9月9日(星期三) 中午1:09 > > 收件人:"user-zh" > > > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > 1. 首先版本是1.11+, 可以直接用binlog > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > yyy这种,那这个sum指标会自动做好这件事。 > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > 将append数据流转成retract数据流,这样下游再用同样的 > > 聚合逻辑,效果也是一样的。 > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > xuzh > > > 场景: > > nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > > nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp; > > 请问是不是根据 update /delete > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benchao Li >
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
直接根据订单的id进行retract(使用last_value group by id ),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > --原始邮件-- > 发件人: > "user-zh" > < > libenc...@apache.org; > 发送时间:2020年9月9日(星期三) 中午1:09 > 收件人:"user-zh" > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > 1. 首先版本是1.11+, 可以直接用binlog > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > yyy这种,那这个sum指标会自动做好这件事。 > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 > 聚合逻辑,效果也是一样的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > xuzh > 场景: > nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp; > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > -- > > Best, > Benchao Li
Re: FlinkKafkaConsumer问题
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level api。是这个意思吧。 op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? > > > --原始邮件-- > 发件人: > "user-zh" > < > acqua@gmail.com; > 发送时间:2020年9月3日(星期四) 晚上6:09 > 收件人:"user-zh" > 主题:Re: FlinkKafkaConsumer问题 > > > > Hi op, > > 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 > partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 > Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 > > 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit > offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。 > > 在 2020年9月3日,下午3:03,op <520075...@qq.com 写道: > > nbsp; nbsp; hi,nbsp; nbsp; > 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序: > //--- > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment > Env.setRestartStrategy(RestartStrategies.noRestart()) > val consumerProps = new Properties() > consumerProps.put("bootstrap.servers", brokers) > consumerProps.put("group.id", "test1234") > > val consumer = new FlinkKafkaConsumer[String](topic,new > KafkaStringSchema,consumerProps).setStartFromLatest() > Env.addSource(consumer).print() > > Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic, > group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer > group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 > 谢谢
Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问
使用自定义的Table Sink就可以了啊. Luan Cooper 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > 但是上面的 SQL 是做不到的,只会一直 Insert > > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > 社区的 FLIP-87 > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > 可以解决这个问题吗? > > 感谢 >
Re: 关于撤回流的Watermark问题
但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。 override def needsUpdatesAsRetraction = true override def consumesRetractions = true Benchao Li 于2020年4月28日周二 上午10:19写道: > Hi lec, > > Window Operator目前是不支持retract的输入的。 > > lec ssmi 于2020年4月28日周二 上午9:45写道: > > > Hi: > >在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢? > >举个例子, > >假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 > time > > > > > window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time > > window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗? > > 谢谢。 > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
关于撤回流的Watermark问题
Hi: 在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢? 举个例子, 假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗? 谢谢。
Re: 晚于watermark的数据何时被抛弃
谢谢回答。 但这样存在一个问题,加入我不使用window,用普通的group by hour 来实现聚合,hour为string类型。我也需要丢弃掉晚于watermark的数据, sql中,在 TableAggregateFunction 里面是无法操作的。 DataStream 有ProcessFunction,当然是可以实现的。 Benchao Li 于2020年4月27日周一 下午5:47写道: > Hi, > > 你的理解是对的,只有涉及到时间的一些算子才会有可能丢弃迟到的数据,比如典型的就是Window和CEP。 > 像普通的算子Map、Filter这种,不涉及到时间的概念,不会丢弃数据的。 > > lec ssmi 于2020年4月27日周一 下午5:38写道: > > > Hi: > > 如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join > > 也会自动过滤掉而不处理? > > 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。 > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
晚于watermark的数据何时被抛弃
Hi: 如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join 也会自动过滤掉而不处理? 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
TwoPhaseCommitSinkFunction在TableAPI 中的应用
Hi: 将TwoPhaseCommitSinkFunction 在TableAPI中的Sink实现,能够生效吗?看案例都是DataStream API在使用。 谢谢。
人为生成retract记录
Hi: 假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。 因为需要利用聚合算子能够自动处理retract的特性,所以需要将kafka的结果封装成带有不同header的row,即组装为INSERT DELETE UPDATE类型的数据。 有什么办法可以解决吗? 如果将上下游程序合在一起 ,是没问题的,现在的难点就是拆分。 谢谢。
Re: retract的问题
不好意思,刚才看了一下源码: [image: image.png] 这个是over window的聚合操作。 这个类实现没有实现producesUpdates 和producesRetractions, 而这两个方法的默认值都是False。是否说明,只能有INSERT类型的记录? 如果是的话,不就是说明over window操作的输出是一个Append-only stream? lec ssmi 于2020年4月23日周四 下午5:13写道: > 明白了,谢谢。 > > Benchao Li 于2020年4月23日周四 下午5:08写道: > >> 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。 >> 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。 >> >> lec ssmi 于2020年4月23日周四 下午5:00写道: >> >> > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。 >> > >> > Benchao Li 于2020年4月23日周四 下午4:54写道: >> > >> > > Hi Jingsong, >> > > 我建了一个jira[1] 来跟踪这个事情。 >> > > >> > > Hi lec, >> > > sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction >> > > scalar function不需要这样子处理,因为它本身没有状态。scalar >> > function对于消息的类型是不需要判断的,处理过程都是一样的。 >> > > >> > > [1] https://issues.apache.org/jira/browse/FLINK-17343 >> > > >> > > lec ssmi 于2020年4月23日周四 下午4:41写道: >> > > >> > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话, >> > > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于 >> > > > if( type='DELETE'){ >> > > > sum=sum-value >> > > > } else if(type='INSERT'){ >> > > > sum=sum+value >> > > >} >> > > > 的逻辑呢? >> > > > 但是在ScalarFunction中,只实现了eval方法,也就是只有 >> INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。 >> > > > >> > > > Benchao Li 于2020年4月23日周四 下午4:33写道: >> > > > >> > > > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。 >> > > > > >> > > > > lec ssmi 于2020年4月23日周四 下午4:29写道: >> > > > > >> > > > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over >> > > > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 >> > > > > > >> > > > > > Benchao Li 于2020年4月23日周四 下午4:26写道: >> > > > > > >> > > > > > > time interval join不允许输入是非append的。 >> > > > > > > >> > > > > > > >> > > > > > > lec ssmi 于2020年4月23日周四 下午4:18写道: >> > > > > > > >> > > > > > > > 那如果是两个retract算子后的流进行time interval join, >> > > > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? >> > > > > > > > >> > > > > > > > Benchao Li 于2020年4月23日周四 下午4:11写道: >> > > > > > > > >> > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。 >> > > > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 >> > > > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over >> > > > > > > > window的确是会需要处理retract,除此之外,regular >> > > > > > > > > group by也需要。 >> > > > > > > > > >> > > > > > > > > lec ssmi 于2020年4月23日周四 >> 下午4:05写道: >> > > > > > > > > >> > > > > > > > > > 谢谢。 >> > > > > > > > > > >> > > > > >> 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 >> > > > > > > > > > 但是对于Table >> > > > > > > > > > >> > > > > >> API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 >> > > > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be >> > > > > implemented >> > > > > > > > > > for datastream bounded over aggregate 。 是否说只有over >> > > > > > > window的时候才有retract? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 >> > > > > > > > > > >> > > > > > > > > > Benchao Li 于2020年4月23日周四 >> 下午3:59写道: >> > > > > > > > > > >> > > > > > > > > > > 这个暂
Re: retract的问题
明白了,谢谢。 Benchao Li 于2020年4月23日周四 下午5:08写道: > 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。 > 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。 > > lec ssmi 于2020年4月23日周四 下午5:00写道: > > > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。 > > > > Benchao Li 于2020年4月23日周四 下午4:54写道: > > > > > Hi Jingsong, > > > 我建了一个jira[1] 来跟踪这个事情。 > > > > > > Hi lec, > > > sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction > > > scalar function不需要这样子处理,因为它本身没有状态。scalar > > function对于消息的类型是不需要判断的,处理过程都是一样的。 > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-17343 > > > > > > lec ssmi 于2020年4月23日周四 下午4:41写道: > > > > > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话, > > > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于 > > > > if( type='DELETE'){ > > > > sum=sum-value > > > > } else if(type='INSERT'){ > > > > sum=sum+value > > > >} > > > > 的逻辑呢? > > > > 但是在ScalarFunction中,只实现了eval方法,也就是只有 > INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。 > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:33写道: > > > > > > > > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。 > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:29写道: > > > > > > > > > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over > > > > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:26写道: > > > > > > > > > > > > > time interval join不允许输入是非append的。 > > > > > > > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:18写道: > > > > > > > > > > > > > > > 那如果是两个retract算子后的流进行time interval join, > > > > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:11写道: > > > > > > > > > > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。 > > > > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 > > > > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over > > > > > > > > window的确是会需要处理retract,除此之外,regular > > > > > > > > > group by也需要。 > > > > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > > > > > > > > > > > > > > > > > 谢谢。 > > > > > > > > > > > > > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > > > > > > > > > 但是对于Table > > > > > > > > > > > > > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > > > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be > > > > > implemented > > > > > > > > > > for datastream bounded over aggregate 。 是否说只有over > > > > > > > window的时候才有retract? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 > > > > > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:59写道: > > > > > > > > > > > > > > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > > > > > > > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 > > 下午3:45写道: > > > > > > > > > > > > > > > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 > > 下午3:39写道: > > > > > > > > > > > > > &g
Re: retract的问题
那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。 Benchao Li 于2020年4月23日周四 下午4:54写道: > Hi Jingsong, > 我建了一个jira[1] 来跟踪这个事情。 > > Hi lec, > sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction > scalar function不需要这样子处理,因为它本身没有状态。scalar function对于消息的类型是不需要判断的,处理过程都是一样的。 > > [1] https://issues.apache.org/jira/browse/FLINK-17343 > > lec ssmi 于2020年4月23日周四 下午4:41写道: > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话, > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于 > > if( type='DELETE'){ > > sum=sum-value > > } else if(type='INSERT'){ > > sum=sum+value > >} > > 的逻辑呢? > > 但是在ScalarFunction中,只实现了eval方法,也就是只有 INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。 > > > > Benchao Li 于2020年4月23日周四 下午4:33写道: > > > > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。 > > > > > > lec ssmi 于2020年4月23日周四 下午4:29写道: > > > > > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over > > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:26写道: > > > > > > > > > time interval join不允许输入是非append的。 > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:18写道: > > > > > > > > > > > 那如果是两个retract算子后的流进行time interval join, > > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:11写道: > > > > > > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。 > > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 > > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over > > > > > > window的确是会需要处理retract,除此之外,regular > > > > > > > group by也需要。 > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > > > > > > > > > > > > > 谢谢。 > > > > > > > > > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > > > > > > > 但是对于Table > > > > > > > > > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be > > > implemented > > > > > > > > for datastream bounded over aggregate 。 是否说只有over > > > > > window的时候才有retract? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:59写道: > > > > > > > > > > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > > > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > > > > > > > > > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:39写道: > > > > > > > > > > > > > > > > > > > > > Hi lec, > > > > > > > > > > > > > > > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > > > > > > > > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > > > > > > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early > > > > fire或者late > > > > > > > > fire的时候。 > > > > > > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子 > > > > > > > > > > > > > > > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > > > > > > > > > > > 这个也不绝对。大部分时候是。 > > > > > > > > > > > 这个取决于这个算子本身是不是会consume > > > > > > > > > > > retraction,目前我好想没见到有算子会消费re
Re: retract的问题
其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话, 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于 if( type='DELETE'){ sum=sum-value } else if(type='INSERT'){ sum=sum+value } 的逻辑呢? 但是在ScalarFunction中,只实现了eval方法,也就是只有 INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。 Benchao Li 于2020年4月23日周四 下午4:33写道: > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。 > > lec ssmi 于2020年4月23日周四 下午4:29写道: > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 > > > > Benchao Li 于2020年4月23日周四 下午4:26写道: > > > > > time interval join不允许输入是非append的。 > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:18写道: > > > > > > > 那如果是两个retract算子后的流进行time interval join, > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? > > > > > > > > Benchao Li 于2020年4月23日周四 下午4:11写道: > > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。 > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over > > > > window的确是会需要处理retract,除此之外,regular > > > > > group by也需要。 > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > > > > > > > > > 谢谢。 > > > > > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > > > > > 但是对于Table > > > > > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be > implemented > > > > > > for datastream bounded over aggregate 。 是否说只有over > > > window的时候才有retract? > > > > > > > > > > > > > > > > > > > > > > > > > > > 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:59写道: > > > > > > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > > > > > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > > > > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:39写道: > > > > > > > > > > > > > > > > > Hi lec, > > > > > > > > > > > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > > > > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > > > > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early > > fire或者late > > > > > > fire的时候。 > > > > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子 > > > > > > > > > > > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > > > > > > > 这个也不绝对。大部分时候是。 > > > > > > > > > 这个取决于这个算子本身是不是会consume > > > > > > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > > > > > > 是的。 > > > > > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:25写道: > > > > > > > > > > > > > > > > > > > Hi: > > > > > > > > > >有几个问题想咨询下大佬: > > > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > > > > > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Benchao Li > > > > > > > > > School of Electronics Engineering and Computer Science, > > Peking > > > > > > > University > > > > > > > > > Tel:+86-15650713730 > > > > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Benchao Li > > > > > > > School of Electronics Engineering and Computer Science, Peking > > > > > University > > > > > > > Tel:+86-15650713730 > > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Benchao Li > > > > > School of Electronics Engineering and Computer Science, Peking > > > University > > > > > Tel:+86-15650713730 > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
Re: retract的问题
奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over window做的,然后再做的join,然后将join的结果进行tumble window 聚合。 Benchao Li 于2020年4月23日周四 下午4:26写道: > time interval join不允许输入是非append的。 > > > lec ssmi 于2020年4月23日周四 下午4:18写道: > > > 那如果是两个retract算子后的流进行time interval join, > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? > > > > Benchao Li 于2020年4月23日周四 下午4:11写道: > > > > > 内置的*聚合函数*应该是都能处理retract消息的。 > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over > > window的确是会需要处理retract,除此之外,regular > > > group by也需要。 > > > > > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > > > > > 谢谢。 > > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > > > 但是对于Table > > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented > > > > for datastream bounded over aggregate 。 是否说只有over > window的时候才有retract? > > > > > > > > > > > > > > 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:59写道: > > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:39写道: > > > > > > > > > > > > > Hi lec, > > > > > > > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late > > > > fire的时候。 > > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子 > > > > > > > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > > > 这个也不绝对。大部分时候是。 > > > > > > > 这个取决于这个算子本身是不是会consume > > > > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > > > > > > > > > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > > 是的。 > > > > > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:25写道: > > > > > > > > > > > > > > > Hi: > > > > > > > >有几个问题想咨询下大佬: > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Benchao Li > > > > > > > School of Electronics Engineering and Computer Science, Peking > > > > > University > > > > > > > Tel:+86-15650713730 > > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Benchao Li > > > > > School of Electronics Engineering and Computer Science, Peking > > > University > > > > > Tel:+86-15650713730 > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
Re: retract的问题
那如果是两个retract算子后的流进行time interval join, 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游? Benchao Li 于2020年4月23日周四 下午4:11写道: > 内置的*聚合函数*应该是都能处理retract消息的。 > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。 > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over window的确是会需要处理retract,除此之外,regular > group by也需要。 > > lec ssmi 于2020年4月23日周四 下午4:05写道: > > > 谢谢。 > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 > > 但是对于Table > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented > > for datastream bounded over aggregate 。 是否说只有over window的时候才有retract? > > > > > 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 > > > > Benchao Li 于2020年4月23日周四 下午3:59写道: > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > > > > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > > > > > > > Benchao Li 于2020年4月23日周四 下午3:39写道: > > > > > > > > > Hi lec, > > > > > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late > > fire的时候。 > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子 > > > > > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > 这个也不绝对。大部分时候是。 > > > > > 这个取决于这个算子本身是不是会consume > > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > 是的。 > > > > > > > > > > lec ssmi 于2020年4月23日周四 下午3:25写道: > > > > > > > > > > > Hi: > > > > > >有几个问题想咨询下大佬: > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Benchao Li > > > > > School of Electronics Engineering and Computer Science, Peking > > > University > > > > > Tel:+86-15650713730 > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > > > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
Re: retract的问题
谢谢。 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。 但是对于Table API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented for datastream bounded over aggregate 。 是否说只有over window的时候才有retract? 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。 Benchao Li 于2020年4月23日周四 下午3:59写道: > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。 > > lec ssmi 于2020年4月23日周四 下午3:45写道: > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 > > > > > > Benchao Li 于2020年4月23日周四 下午3:39写道: > > > > > Hi lec, > > > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late fire的时候。 > > > 还有些算子本身不会产生,但是会传递,比如calc算子 > > > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > 这个也不绝对。大部分时候是。 > > > 这个取决于这个算子本身是不是会consume > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > 是的。 > > > > > > lec ssmi 于2020年4月23日周四 下午3:25写道: > > > > > > > Hi: > > > >有几个问题想咨询下大佬: > > > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > > > > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > > > > > > > > > -- > > > > > > Benchao Li > > > School of Electronics Engineering and Computer Science, Peking > University > > > Tel:+86-15650713730 > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
Re: retract的问题
这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。 Benchao Li 于2020年4月23日周四 下午3:39写道: > Hi lec, > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。 > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late fire的时候。 > 还有些算子本身不会产生,但是会传递,比如calc算子 > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > 这个也不绝对。大部分时候是。 > 这个取决于这个算子本身是不是会consume > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。 > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > 是的。 > > lec ssmi 于2020年4月23日周四 下午3:25写道: > > > Hi: > >有几个问题想咨询下大佬: > > 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? > > 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来? > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
retract的问题
Hi: 有几个问题想咨询下大佬: 1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置? 2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
Re: Re: spark代码直接运行至Flink平台
那还不如直接用apache beam直接将这些框架的API全部统一起来。 hsdcl...@163.com 于2020年4月22日周三 上午11:27写道: > 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java, > spark就像Scala一样 > > > > > hsdcl...@163.com > > 发件人: Jeff Zhang > 发送时间: 2020-04-22 10:52 > 收件人: user-zh > 主题: Re: spark代码直接运行至Flink平台 > 啥目的 ? > > hsdcl...@163.com 于2020年4月22日周三 上午9:49写道: > > > Hi, > > 有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台 > > > > -- > Best Regards > > Jeff Zhang >
Re: 为消息分配时间戳但不想重新分配水印
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? taowang 于2020年4月17日周五 上午10:46写道: > 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: lec ssmi > 收件人: flink-user-cn > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 请问,你对DataStream重新声明时间列和水印,生效吗? taowang > 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > > 原始邮件 > 发件人: tison > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: > 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…
Re: 为消息分配时间戳但不想重新分配水印
请问,你对DataStream重新声明时间列和水印,生效吗? taowang 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 原始邮件 > 发件人: tison > 收件人: user-zh > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…
Re: 重复声明watermark的问题
谢谢,主要是我再次声明watermark后,再转成table,然后再window操作,就一直报错,现在能确定是时间属性字段的问题。我用的阿里云的blink,他们开发人员说好像不能这么操作。 jun su 于 2020年4月10日周五 23:36写道: > hi, > 1. 以我的理解, 再次下发watermark会覆盖之前的, 所以在经过n个operator后,可以用再次声明watermark的方式来调整. > 2. 如果是DataStream模式下, 两个流join后, 下游收到的watermark是较小的流的watermark, > 如果是stream模式下的sql window join, 那么下游收到的 > watermark是较小一侧表的watermark - sql设定的时间窗口. > > lec ssmi 于2020年4月8日周三 下午2:05写道: > > > 大家好: > > 请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗? > > 比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。 > > 另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。 > > 主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。 > > > > > -- > Best, > Jun Su >
重复声明watermark的问题
大家好: 请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗? 比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。 另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。 主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。