Re: Re: Flink SQL撤回流问题

2020-09-27 文章 lec ssmi
你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。

kandy.wang  于2020年9月27日周日 下午2:01写道:

>
>
>
>
>
>
> hi
> 你建mysql要指定主键,另外创建flink表时也要指定一下主键
>
> PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了
>
>
>
>
>
>
>
>
> 在 2020-09-27 13:36:25,"xiao cai"  写道:
>
> >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
> >这是我很困惑的地方。
> >
> >
> > 原始邮件
> >发件人: lec ssmi
> >收件人: flink-user-cn
> >发送时间: 2020年9月27日(周日) 13:06
> >主题: Re: Flink SQL撤回流问题
> >
> >
> >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai <
> flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka >
> sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink >
> select dt,count(distinct id) from source group by dt; > > >
> 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show
> create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 >
> 发件人: Michael Ran > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink
> SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" 
> 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment >
> 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
>


Re: Flink SQL撤回流问题

2020-09-26 文章 lec ssmi
是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。

xiao cai  于2020年9月27日周日 下午12:08写道:

> 场景如下:
> source table:   kafka
> sink table:   mysql  schem(id, dt, cnt)
>
>
> insert :
> insert into sink
> select dt,count(distinct id) from source group by dt;
>
>
> 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变
> show create table sink可以发现auto_increment在不断的变大。
> 当超过id的取值范围,就会报错了。
>
>
>  原始邮件
> 发件人: Michael Ran
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 11:51
> 主题: Re:Flink SQL撤回流问题
>
>
> 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  写道: >Hi:
> >使用Flink SQL撤回流写入MySQL,表的auto_increment
> 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 lec ssmi
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。

Benchao Li  于2020年9月10日周四 下午3:26写道:

> sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
> 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
>
> last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
>
> lec ssmi  于2020年9月10日周四 下午2:35写道:
>
> > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
> > 感觉底层和 last_value() group by id是一样的。
> >
> > Benchao Li  于2020年9月10日周四 上午10:34写道:
> >
> > >
> > >
> >
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> > > 如果还有自己的binlog格式,也可以自定义format来实现。
> > >
> > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
> > > 1. append / update_after 消息会累加到聚合指标上
> > > 2. delete / update_before 消息会从聚合指标上进行retract
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
> > > [2]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
> > >
> > > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:
> > >
> > > > 请问第1点是有实际的案例使用了么?
> > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> > > > 谢谢.
> > > >
> > > >
> > > >
> > > >
> > > > --原始邮件--
> > > > 发件人:
> > > >   "user-zh"
> > > > <
> > > > libenc...@apache.org;
> > > > 发送时间:2020年9月9日(星期三) 中午1:09
> > > > 收件人:"user-zh" > > >
> > > > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
> > > >
> > > >
> > > >
> > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> > > > 1. 首先版本是1.11+, 可以直接用binlog
> > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
> > > >  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> > > > yyy这种,那这个sum指标会自动做好这件事。
> > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1]
> > > 将append数据流转成retract数据流,这样下游再用同样的
> > > >  聚合逻辑,效果也是一样的。
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > > >
> > > >
> > > > xuzh  > > >
> > > >  场景:
> > > >  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
> > > >  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
> > > >  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka
> > ,GMV实时统计为1000.
> > > >  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> > > >  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
> > > >  请问是不是根据 update /delete
> > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> > > > 
> > > > 
> > > >  nbsp; 刚入坑实时处理,请大神赐教
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 lec ssmi
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
感觉底层和 last_value() group by id是一样的。

Benchao Li  于2020年9月10日周四 上午10:34写道:

>
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> 如果还有自己的binlog格式,也可以自定义format来实现。
>
> 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
> 1. append / update_after 消息会累加到聚合指标上
> 2. delete / update_before 消息会从聚合指标上进行retract
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
>
> 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:
>
> > 请问第1点是有实际的案例使用了么?
> > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> > 谢谢.
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> >   "user-zh"
> > <
> > libenc...@apache.org;
> > 发送时间:2020年9月9日(星期三) 中午1:09
> > 收件人:"user-zh" >
> > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
> >
> >
> >
> > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> > 1. 首先版本是1.11+, 可以直接用binlog
> > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
> >  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> > yyy这种,那这个sum指标会自动做好这件事。
> > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1]
> 将append数据流转成retract数据流,这样下游再用同样的
> >  聚合逻辑,效果也是一样的。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> >
> >
> > xuzh  >
> >  场景:
> >  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
> >  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
> >  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
> >  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> >  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
> >  请问是不是根据 update /delete
> 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> > 
> > 
> >  nbsp; 刚入坑实时处理,请大神赐教
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 lec ssmi
直接根据订单的id进行retract(使用last_value  group by id
),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。

忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:

> 请问第1点是有实际的案例使用了么?
> 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> 谢谢.
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月9日(星期三) 中午1:09
> 收件人:"user-zh"
> 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
>
>
>
> 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> 1. 首先版本是1.11+, 可以直接用binlog
> format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
>  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> yyy这种,那这个sum指标会自动做好这件事。
> 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的
>  聚合逻辑,效果也是一样的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
> xuzh 
>  场景:
>  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
>  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
>  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
>  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
>  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
>  请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> 
> 
>  nbsp; 刚入坑实时处理,请大神赐教
>
>
>
> --
>
> Best,
> Benchao Li


Re: FlinkKafkaConsumer问题

2020-09-03 文章 lec ssmi
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level
api。是这个意思吧。

op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道:

>
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年9月3日(星期四) 晚上6:09
> 收件人:"user-zh"
> 主题:Re: FlinkKafkaConsumer问题
>
>
>
> Hi op,
>
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有
> partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到
> Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
>
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit
> offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
>
>  在 2020年9月3日,下午3:03,op <520075...@qq.com 写道:
> 
>  nbsp; nbsp; hi,nbsp; nbsp;
> 我对FlinkKafkaConsumer的实现有点迷惑,nbsp; nbsp; 这有两个相同代码的程序:
>  //---
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
> 
>  val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
> 
> Env.execute()//---我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢


Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

2020-05-07 文章 lec ssmi
使用自定义的Table Sink就可以了啊.

Luan Cooper  于2020年5月7日周四 下午8:39写道:

> Hi
>
> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL
> 如下
>
> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> SELECT *
> FROM binlog // mysql 表的 binlog
>
> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> 但是上面的 SQL 是做不到的,只会一直 Insert
>
> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
>
> 社区的 FLIP-87
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
>  可以解决这个问题吗?
>
> 感谢
>


Re: 关于撤回流的Watermark问题

2020-04-27 文章 lec ssmi
但是我在DataStreamGroupWindowAggregateBase这个类里面,发现以下两个方法都是true啊。
override def needsUpdatesAsRetraction = true
override def consumesRetractions = true

Benchao Li  于2020年4月28日周二 上午10:19写道:

> Hi lec,
>
> Window Operator目前是不支持retract的输入的。
>
> lec ssmi  于2020年4月28日周二 上午9:45写道:
>
> > Hi:
> >在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
> >举个例子,
> >假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行
> time
> >
> >
> window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
> > window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
> >  谢谢。
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


关于撤回流的Watermark问题

2020-04-27 文章 lec ssmi
Hi:
   在tableAPI中,带有时间属性的聚合,比如window聚合,对于retract消息的事件延迟怎么处理呢?
   举个例子,
   假设上游使用了last_value 操作加over window操作,一直生成一条数据的最新值,然后和另外一个流做join , 再进行 time
window聚合操作。现在已经十点,最大延迟为一个小时,这个时候event-time为9点的消息,已经超过了最大延迟,但是在join中仍然生成一条join后的记录(因为join不过滤超时的数据),这条记录会对原先join好的一条记录进行撤回。那这个撤回消息,在到达time
window的时候,因为超过了最大延迟,DELELE记录和INSERT记录都不会被处理吗?
 谢谢。


Re: 晚于watermark的数据何时被抛弃

2020-04-27 文章 lec ssmi
谢谢回答。
但这样存在一个问题,加入我不使用window,用普通的group by hour
来实现聚合,hour为string类型。我也需要丢弃掉晚于watermark的数据,
sql中,在 TableAggregateFunction   里面是无法操作的。
DataStream 有ProcessFunction,当然是可以实现的。

Benchao Li  于2020年4月27日周一 下午5:47写道:

> Hi,
>
> 你的理解是对的,只有涉及到时间的一些算子才会有可能丢弃迟到的数据,比如典型的就是Window和CEP。
> 像普通的算子Map、Filter这种,不涉及到时间的概念,不会丢弃数据的。
>
> lec ssmi  于2020年4月27日周一 下午5:38写道:
>
> > Hi:
> >   如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
> > 也会自动过滤掉而不处理?
> >  感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


晚于watermark的数据何时被抛弃

2020-04-27 文章 lec ssmi
Hi:
  如果有晚于watermark的数据,只有涉及到时间的算子,比如时间窗口,才会自动地过滤掉这些数据吗?或者说其他算子,比如map,join
也会自动过滤掉而不处理?
 感觉类似于ProcessFunction,提供了一个获取currentWatermark的方法,是否不处理,都取决于自己的代码逻辑。


TwoPhaseCommitSinkFunction在TableAPI 中的应用

2020-04-26 文章 lec ssmi
Hi:
   将TwoPhaseCommitSinkFunction 在TableAPI中的Sink实现,能够生效吗?看案例都是DataStream
API在使用。
 谢谢。


人为生成retract记录

2020-04-25 文章 lec ssmi
Hi:
假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。
因为需要利用聚合算子能够自动处理retract的特性,所以需要将kafka的结果封装成带有不同header的row,即组装为INSERT
DELETE UPDATE类型的数据。
有什么办法可以解决吗?
如果将上下游程序合在一起 ,是没问题的,现在的难点就是拆分。
   谢谢。


Re: retract的问题

2020-04-23 文章 lec ssmi
不好意思,刚才看了一下源码:
[image: image.png]
这个是over window的聚合操作。
这个类实现没有实现producesUpdates 和producesRetractions,
而这两个方法的默认值都是False。是否说明,只能有INSERT类型的记录?
如果是的话,不就是说明over window操作的输出是一个Append-only stream?

lec ssmi  于2020年4月23日周四 下午5:13写道:

> 明白了,谢谢。
>
> Benchao Li  于2020年4月23日周四 下午5:08写道:
>
>> 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。
>> 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。
>>
>> lec ssmi  于2020年4月23日周四 下午5:00写道:
>>
>> > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。
>> >
>> > Benchao Li  于2020年4月23日周四 下午4:54写道:
>> >
>> > > Hi Jingsong,
>> > > 我建了一个jira[1] 来跟踪这个事情。
>> > >
>> > > Hi lec,
>> > > sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction
>> > > scalar function不需要这样子处理,因为它本身没有状态。scalar
>> > function对于消息的类型是不需要判断的,处理过程都是一样的。
>> > >
>> > > [1] https://issues.apache.org/jira/browse/FLINK-17343
>> > >
>> > > lec ssmi  于2020年4月23日周四 下午4:41写道:
>> > >
>> > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话,
>> > > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于
>> > > >  if( type='DELETE'){
>> > > >  sum=sum-value
>> > > > } else if(type='INSERT'){
>> > > > sum=sum+value
>> > > >}
>> > > >  的逻辑呢?
>> > > >  但是在ScalarFunction中,只实现了eval方法,也就是只有
>> INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。
>> > > >
>> > > > Benchao Li  于2020年4月23日周四 下午4:33写道:
>> > > >
>> > > > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。
>> > > > >
>> > > > > lec ssmi  于2020年4月23日周四 下午4:29写道:
>> > > > >
>> > > > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
>> > > > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
>> > > > > >
>> > > > > > Benchao Li  于2020年4月23日周四 下午4:26写道:
>> > > > > >
>> > > > > > > time interval join不允许输入是非append的。
>> > > > > > >
>> > > > > > >
>> > > > > > > lec ssmi  于2020年4月23日周四 下午4:18写道:
>> > > > > > >
>> > > > > > > > 那如果是两个retract算子后的流进行time interval join,
>> > > > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?
>> > > > > > > >
>> > > > > > > > Benchao Li  于2020年4月23日周四 下午4:11写道:
>> > > > > > > >
>> > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。
>> > > > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
>> > > > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over
>> > > > > > > > window的确是会需要处理retract,除此之外,regular
>> > > > > > > > > group by也需要。
>> > > > > > > > >
>> > > > > > > > > lec ssmi  于2020年4月23日周四
>> 下午4:05写道:
>> > > > > > > > >
>> > > > > > > > > > 谢谢。
>> > > > > > > > > >
>> > > > >
>> 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
>> > > > > > > > > > 但是对于Table
>> > > > > > > > > >
>> > > > >
>> API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
>> > > > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be
>> > > > > implemented
>> > > > > > > > > > for  datastream bounded over aggregate  。 是否说只有over
>> > > > > > > window的时候才有retract?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
>> > > > > > > > > >
>> > > > > > > > > > Benchao Li  于2020年4月23日周四
>> 下午3:59写道:
>> > > > > > > > > >
>> > > > > > > > > > > 这个暂

Re: retract的问题

2020-04-23 文章 lec ssmi
明白了,谢谢。

Benchao Li  于2020年4月23日周四 下午5:08写道:

> 不是这个意思。是说scalar function不需要额外处理,所以它一套逻辑就可以处理两种类型的消息了。
> 它不需要区分消息类型,只需要处理消息本身(消息类型是在header里)。
>
> lec ssmi  于2020年4月23日周四 下午5:00写道:
>
> > 那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。
> >
> > Benchao Li  于2020年4月23日周四 下午4:54写道:
> >
> > > Hi Jingsong,
> > > 我建了一个jira[1] 来跟踪这个事情。
> > >
> > > Hi lec,
> > > sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction
> > > scalar function不需要这样子处理,因为它本身没有状态。scalar
> > function对于消息的类型是不需要判断的,处理过程都是一样的。
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-17343
> > >
> > > lec ssmi  于2020年4月23日周四 下午4:41写道:
> > >
> > > > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话,
> > > > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于
> > > >  if( type='DELETE'){
> > > >  sum=sum-value
> > > > } else if(type='INSERT'){
> > > > sum=sum+value
> > > >}
> > > >  的逻辑呢?
> > > >  但是在ScalarFunction中,只实现了eval方法,也就是只有
> INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。
> > > >
> > > > Benchao Li  于2020年4月23日周四 下午4:33写道:
> > > >
> > > > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。
> > > > >
> > > > > lec ssmi  于2020年4月23日周四 下午4:29写道:
> > > > >
> > > > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
> > > > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
> > > > > >
> > > > > > Benchao Li  于2020年4月23日周四 下午4:26写道:
> > > > > >
> > > > > > > time interval join不允许输入是非append的。
> > > > > > >
> > > > > > >
> > > > > > > lec ssmi  于2020年4月23日周四 下午4:18写道:
> > > > > > >
> > > > > > > > 那如果是两个retract算子后的流进行time interval join,
> > > > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?
> > > > > > > >
> > > > > > > > Benchao Li  于2020年4月23日周四 下午4:11写道:
> > > > > > > >
> > > > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。
> > > > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
> > > > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over
> > > > > > > > window的确是会需要处理retract,除此之外,regular
> > > > > > > > > group by也需要。
> > > > > > > > >
> > > > > > > > > lec ssmi  于2020年4月23日周四 下午4:05写道:
> > > > > > > > >
> > > > > > > > > > 谢谢。
> > > > > > > > > >
> > > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > > > > > > > > > 但是对于Table
> > > > > > > > > >
> > > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > > > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be
> > > > > implemented
> > > > > > > > > > for  datastream bounded over aggregate  。 是否说只有over
> > > > > > > window的时候才有retract?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
> > > > > > > > > >
> > > > > > > > > > Benchao Li  于2020年4月23日周四 下午3:59写道:
> > > > > > > > > >
> > > > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
> > > > > > > > > > >
> > > > > > > > > > > lec ssmi  于2020年4月23日周四
> > 下午3:45写道:
> > > > > > > > > > >
> > > > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Benchao Li  于2020年4月23日周四
> > 下午3:39写道:
> > > > > > > > > > > >
> &g

Re: retract的问题

2020-04-23 文章 lec ssmi
那也就是说UDF这种ScalarFunction,是没有办法处理Retract的了?因为它把DELETE记录和INSERT记录都做了相同的操作。

Benchao Li  于2020年4月23日周四 下午4:54写道:

> Hi Jingsong,
> 我建了一个jira[1] 来跟踪这个事情。
>
> Hi lec,
> sum函数不属于scalar函数。sum的内置实现是有retract的版本的,参考:IntSumWithRetractAggFunction
> scalar function不需要这样子处理,因为它本身没有状态。scalar function对于消息的类型是不需要判断的,处理过程都是一样的。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17343
>
> lec ssmi  于2020年4月23日周四 下午4:41写道:
>
> > 其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话,
> > 我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于
> >  if( type='DELETE'){
> >  sum=sum-value
> > } else if(type='INSERT'){
> > sum=sum+value
> >}
> >  的逻辑呢?
> >  但是在ScalarFunction中,只实现了eval方法,也就是只有 INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。
> >
> > Benchao Li  于2020年4月23日周四 下午4:33写道:
> >
> > > 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。
> > >
> > > lec ssmi  于2020年4月23日周四 下午4:29写道:
> > >
> > > > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
> > > > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
> > > >
> > > > Benchao Li  于2020年4月23日周四 下午4:26写道:
> > > >
> > > > > time interval join不允许输入是非append的。
> > > > >
> > > > >
> > > > > lec ssmi  于2020年4月23日周四 下午4:18写道:
> > > > >
> > > > > > 那如果是两个retract算子后的流进行time interval join,
> > > > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?
> > > > > >
> > > > > > Benchao Li  于2020年4月23日周四 下午4:11写道:
> > > > > >
> > > > > > > 内置的*聚合函数*应该是都能处理retract消息的。
> > > > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
> > > > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over
> > > > > > window的确是会需要处理retract,除此之外,regular
> > > > > > > group by也需要。
> > > > > > >
> > > > > > > lec ssmi  于2020年4月23日周四 下午4:05写道:
> > > > > > >
> > > > > > > > 谢谢。
> > > > > > > >
> > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > > > > > > > 但是对于Table
> > > > > > > >
> > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > > > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be
> > > implemented
> > > > > > > > for  datastream bounded over aggregate  。 是否说只有over
> > > > > window的时候才有retract?
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
> > > > > > > >
> > > > > > > > Benchao Li  于2020年4月23日周四 下午3:59写道:
> > > > > > > >
> > > > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
> > > > > > > > >
> > > > > > > > > lec ssmi  于2020年4月23日周四 下午3:45写道:
> > > > > > > > >
> > > > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Benchao Li  于2020年4月23日周四 下午3:39写道:
> > > > > > > > > >
> > > > > > > > > > > Hi lec,
> > > > > > > > > > >
> > > > > > > > > > >  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > > > > > > >
> > > > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> > > > > > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early
> > > > fire或者late
> > > > > > > > fire的时候。
> > > > > > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子
> > > > > > > > > > >
> > > > > > > > > > >  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > > > > > > >
> > > > > > > > > > > 这个也不绝对。大部分时候是。
> > > > > > > > > > > 这个取决于这个算子本身是不是会consume
> > > > > > > > > > > retraction,目前我好想没见到有算子会消费re

Re: retract的问题

2020-04-23 文章 lec ssmi
其实我想说,如果说sql内置的算子,包括UDF这种ScalarFunction默认都是能够处理retract的话,
我们举一个最简单的例子:sum函数,那内部实现是否需要具有一个类似于
 if( type='DELETE'){
 sum=sum-value
} else if(type='INSERT'){
sum=sum+value
   }
 的逻辑呢?
 但是在ScalarFunction中,只实现了eval方法,也就是只有 INSERT的那部分相加的逻辑,没有DELETE那部分相减的逻辑。

Benchao Li  于2020年4月23日周四 下午4:33写道:

> 阿里云上提供的Blink应该是内部版本,跟社区版本有些不一样。我刚才说的都是基于社区版本的。
>
> lec ssmi  于2020年4月23日周四 下午4:29写道:
>
> > 奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
> > window做的,然后再做的join,然后将join的结果进行tumble window 聚合。
> >
> > Benchao Li  于2020年4月23日周四 下午4:26写道:
> >
> > > time interval join不允许输入是非append的。
> > >
> > >
> > > lec ssmi  于2020年4月23日周四 下午4:18写道:
> > >
> > > > 那如果是两个retract算子后的流进行time interval join,
> > > > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?
> > > >
> > > > Benchao Li  于2020年4月23日周四 下午4:11写道:
> > > >
> > > > > 内置的*聚合函数*应该是都能处理retract消息的。
> > > > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
> > > > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over
> > > > window的确是会需要处理retract,除此之外,regular
> > > > > group by也需要。
> > > > >
> > > > > lec ssmi  于2020年4月23日周四 下午4:05写道:
> > > > >
> > > > > > 谢谢。
> > > > > >
> 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > > > > > 但是对于Table
> > > > > >
> API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > > > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be
> implemented
> > > > > > for  datastream bounded over aggregate  。 是否说只有over
> > > window的时候才有retract?
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
> > > > > >
> > > > > > Benchao Li  于2020年4月23日周四 下午3:59写道:
> > > > > >
> > > > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
> > > > > > >
> > > > > > > lec ssmi  于2020年4月23日周四 下午3:45写道:
> > > > > > >
> > > > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> > > > > > > >
> > > > > > > >
> > > > > > > > Benchao Li  于2020年4月23日周四 下午3:39写道:
> > > > > > > >
> > > > > > > > > Hi lec,
> > > > > > > > >
> > > > > > > > >  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > > > > >
> > > > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> > > > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early
> > fire或者late
> > > > > > fire的时候。
> > > > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子
> > > > > > > > >
> > > > > > > > >  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > > > > >
> > > > > > > > > 这个也不绝对。大部分时候是。
> > > > > > > > > 这个取决于这个算子本身是不是会consume
> > > > > > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > >
> 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > > > > > >
> > > > > > > > > 是的。
> > > > > > > > >
> > > > > > > > > lec ssmi  于2020年4月23日周四 下午3:25写道:
> > > > > > > > >
> > > > > > > > > > Hi:
> > > > > > > > > >有几个问题想咨询下大佬:
> > > > > > > > > >   1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > > > > > >   2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Benchao Li
> > > > > > > > > School of Electronics Engineering and Computer Science,
> > Peking
> > > > > > > University
> > > > > > > > > Tel:+86-15650713730
> > > > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Benchao Li
> > > > > > > School of Electronics Engineering and Computer Science, Peking
> > > > > University
> > > > > > > Tel:+86-15650713730
> > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Benchao Li
> > > > > School of Electronics Engineering and Computer Science, Peking
> > > University
> > > > > Tel:+86-15650713730
> > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: retract的问题

2020-04-23 文章 lec ssmi
奇怪,目前我们使用阿里云的Blink,使用了join前的两个流,都是通过last_value 加上over
window做的,然后再做的join,然后将join的结果进行tumble window 聚合。

Benchao Li  于2020年4月23日周四 下午4:26写道:

> time interval join不允许输入是非append的。
>
>
> lec ssmi  于2020年4月23日周四 下午4:18写道:
>
> > 那如果是两个retract算子后的流进行time interval join,
> > 已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?
> >
> > Benchao Li  于2020年4月23日周四 下午4:11写道:
> >
> > > 内置的*聚合函数*应该是都能处理retract消息的。
> > > 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
> > > 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over
> > window的确是会需要处理retract,除此之外,regular
> > > group by也需要。
> > >
> > > lec ssmi  于2020年4月23日周四 下午4:05写道:
> > >
> > > > 谢谢。
> > > > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > > > 但是对于Table
> > > > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > > > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented
> > > > for  datastream bounded over aggregate  。 是否说只有over
> window的时候才有retract?
> > > >
> > > >
> > >
> >
> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
> > > >
> > > > Benchao Li  于2020年4月23日周四 下午3:59写道:
> > > >
> > > > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
> > > > >
> > > > > lec ssmi  于2020年4月23日周四 下午3:45写道:
> > > > >
> > > > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> > > > > >
> > > > > >
> > > > > > Benchao Li  于2020年4月23日周四 下午3:39写道:
> > > > > >
> > > > > > > Hi lec,
> > > > > > >
> > > > > > >  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > > >
> > > > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> > > > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late
> > > > fire的时候。
> > > > > > > 还有些算子本身不会产生,但是会传递,比如calc算子
> > > > > > >
> > > > > > >  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > > >
> > > > > > > 这个也不绝对。大部分时候是。
> > > > > > > 这个取决于这个算子本身是不是会consume
> > > > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
> > > > > > >
> > > > > > >
> > > > >
> > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > > > >
> > > > > > > 是的。
> > > > > > >
> > > > > > > lec ssmi  于2020年4月23日周四 下午3:25写道:
> > > > > > >
> > > > > > > > Hi:
> > > > > > > >有几个问题想咨询下大佬:
> > > > > > > >   1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > > > >   2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > > > >
> > > > > >
> > > >
> 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Benchao Li
> > > > > > > School of Electronics Engineering and Computer Science, Peking
> > > > > University
> > > > > > > Tel:+86-15650713730
> > > > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Benchao Li
> > > > > School of Electronics Engineering and Computer Science, Peking
> > > University
> > > > > Tel:+86-15650713730
> > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: retract的问题

2020-04-23 文章 lec ssmi
那如果是两个retract算子后的流进行time interval join,
已经join成功并且发送出去的记录,也会先DELETE掉,再INSERT,然后将这两条记录发送下游?

Benchao Li  于2020年4月23日周四 下午4:11写道:

> 内置的*聚合函数*应该是都能处理retract消息的。
> 普通的*scalar函数*不需要特殊处理,retract和append消息对它来说都是一样的。
> 我理解应该主要是UDAF可能需要注意一下是否需要处理retract消息,over window的确是会需要处理retract,除此之外,regular
> group by也需要。
>
> lec ssmi  于2020年4月23日周四 下午4:05写道:
>
> > 谢谢。
> > 其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
> > 但是对于Table
> > API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
> > 我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented
> > for  datastream bounded over aggregate  。 是否说只有over window的时候才有retract?
> >
> >
> 另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。
> >
> > Benchao Li  于2020年4月23日周四 下午3:59写道:
> >
> > > 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
> > >
> > > lec ssmi  于2020年4月23日周四 下午3:45写道:
> > >
> > > > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> > > >
> > > >
> > > > Benchao Li  于2020年4月23日周四 下午3:39写道:
> > > >
> > > > > Hi lec,
> > > > >
> > > > >  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > >
> > > > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> > > > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late
> > fire的时候。
> > > > > 还有些算子本身不会产生,但是会传递,比如calc算子
> > > > >
> > > > >  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > >
> > > > > 这个也不绝对。大部分时候是。
> > > > > 这个取决于这个算子本身是不是会consume
> > > > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
> > > > >
> > > > >
> > > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > >
> > > > > 是的。
> > > > >
> > > > > lec ssmi  于2020年4月23日周四 下午3:25写道:
> > > > >
> > > > > > Hi:
> > > > > >有几个问题想咨询下大佬:
> > > > > >   1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > > > >   2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > > > >
> > > >
> > 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Benchao Li
> > > > > School of Electronics Engineering and Computer Science, Peking
> > > University
> > > > > Tel:+86-15650713730
> > > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: retract的问题

2020-04-23 文章 lec ssmi
谢谢。
其实,如果从DataStream编程的角度上来说,下游是能够收到一个Tuple2类型的数据,也就是能够硬编码处理retract的结果。
但是对于Table
API来说,特别是SQL,内置函数本身并没有一个增加处理Retract的逻辑(当然,可能内置算子已经包含了,我没有去看而已)。
我在编写UDAF的时候,里面有个retract方法,注释写的是: This function must be implemented
for  datastream bounded over aggregate  。 是否说只有over window的时候才有retract?
另外,对于我们写的UDF,UDTF,其实也没有提供retract的方式,毕竟传入的参数只是字段值,而没有DataStream中的Tuple2中的Boolean值。其他的内置方法也一样,好像对于retract的处理,sql中只有UDAF里面有所提及。

Benchao Li  于2020年4月23日周四 下午3:59写道:

> 这个暂时还没有一篇文档来介绍这部分内容。如果你要了解全部细节,可能只能从源码的角度来了解了。
>
> lec ssmi  于2020年4月23日周四 下午3:45写道:
>
> > 这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。
> >
> >
> > Benchao Li  于2020年4月23日周四 下午3:39写道:
> >
> > > Hi lec,
> > >
> > >  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > >
> > > 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> > > 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late fire的时候。
> > > 还有些算子本身不会产生,但是会传递,比如calc算子
> > >
> > >  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > >
> > > 这个也不绝对。大部分时候是。
> > > 这个取决于这个算子本身是不是会consume
> > > retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
> > >
> > >
> 3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > >
> > > 是的。
> > >
> > > lec ssmi  于2020年4月23日周四 下午3:25写道:
> > >
> > > > Hi:
> > > >有几个问题想咨询下大佬:
> > > >   1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> > > >   2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> > > >
> >  3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> > > >
> > >
> > >
> > > --
> > >
> > > Benchao Li
> > > School of Electronics Engineering and Computer Science, Peking
> University
> > > Tel:+86-15650713730
> > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: retract的问题

2020-04-23 文章 lec ssmi
这个难道没有一个列表,或者是配置开关之类的吗?难道只能一个一个地尝试?各种算子连接在一起,更难判断了。


Benchao Li  于2020年4月23日周四 下午3:39写道:

> Hi lec,
>
>  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
>
> 这个是某些算子会有这个行为,比如普通的group by,就会发送retract消息。
> 另外有一些算子是在某些特定配置下才会有这个行为,比如window operator,在配置了early fire或者late fire的时候。
> 还有些算子本身不会产生,但是会传递,比如calc算子
>
>  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
>
> 这个也不绝对。大部分时候是。
> 这个取决于这个算子本身是不是会consume
> retraction,目前我好想没见到有算子会消费retraction,但是不产生retraction的。
>
>  3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
>
> 是的。
>
> lec ssmi  于2020年4月23日周四 下午3:25写道:
>
> > Hi:
> >有几个问题想咨询下大佬:
> >   1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
> >   2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
> >   3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


retract的问题

2020-04-23 文章 lec ssmi
Hi:
   有几个问题想咨询下大佬:
  1.retract在什么时候触发呢?是有groupby或者窗口就默认retract吗,还是需要配置?
  2.如果上游操作有retract,那么不是所有的下游都带有retract性质了?不然下游计算的数据就不准了。
  3.sql操作的话,如果上游是有retract的,那下游select然后print,会把DELETE和INSERT这两条记录都print出来?


Re: Re: spark代码直接运行至Flink平台

2020-04-21 文章 lec ssmi
那还不如直接用apache beam直接将这些框架的API全部统一起来。

hsdcl...@163.com  于2020年4月22日周三 上午11:27写道:

> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spark就像Scala一样
>
>
>
>
> hsdcl...@163.com
>
> 发件人: Jeff Zhang
> 发送时间: 2020-04-22 10:52
> 收件人: user-zh
> 主题: Re: spark代码直接运行至Flink平台
> 啥目的 ?
>
> hsdcl...@163.com  于2020年4月22日周三 上午9:49写道:
>
> >   Hi,
> >   有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: 为消息分配时间戳但不想重新分配水印

2020-04-17 文章 lec ssmi
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的?

taowang  于2020年4月17日周五 上午10:46写道:

> 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。
>
>
>  原始邮件
> 发件人: lec ssmi
> 收件人: flink-user-cn
> 发送时间: 2020年4月17日(周五) 09:25
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 请问,你对DataStream重新声明时间列和水印,生效吗? taowang 
> 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 >
> 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ >
> > > 原始邮件 > 发件人: tison > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re:
> 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 lec ssmi
请问,你对DataStream重新声明时间列和水印,生效吗?

taowang  于2020年4月16日周四 下午10:49写道:

> 嗯嗯,还是十分感谢。
> 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。
>
>
> 打扰各位了,祝好!~
>
>
>  原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 22:39
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 正在载入邮件原文…


Re: 重复声明watermark的问题

2020-04-10 文章 lec ssmi
谢谢,主要是我再次声明watermark后,再转成table,然后再window操作,就一直报错,现在能确定是时间属性字段的问题。我用的阿里云的blink,他们开发人员说好像不能这么操作。

jun su  于 2020年4月10日周五 23:36写道:

> hi,
> 1. 以我的理解, 再次下发watermark会覆盖之前的, 所以在经过n个operator后,可以用再次声明watermark的方式来调整.
> 2. 如果是DataStream模式下, 两个流join后, 下游收到的watermark是较小的流的watermark,
> 如果是stream模式下的sql window join, 那么下游收到的
> watermark是较小一侧表的watermark - sql设定的时间窗口.
>
> lec ssmi  于2020年4月8日周三 下午2:05写道:
>
> > 大家好:
> >   请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
> >   比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
> >   另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。
> >   主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。
> >
>
>
> --
> Best,
> Jun Su
>


重复声明watermark的问题

2020-04-08 文章 lec ssmi
大家好:
  请问,对一个DataStream重复声明watermark,前面的watermark会被覆盖掉吗?
  比如我再source端声明了watermark,进行了一系列操作后,我觉得watermark的延迟不满足需求,就再次声明一次。
  另外,稍微咨询下另外一个问题,两个流join之后,watermark会消失吗?看书上说的是,以两个流最小的watermark(全局最小)为准。
  主要是在阿里云Blink上,使用sql进行join后,说的是时间属性字段会消失。有点不明白。