回复:flink mysql cdc + hive streaming疑问
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制 在2020年10月31日 12:06,陈帅 写道: 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id, team_name, create_time, update_time]) 我的问题: 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka -> hive streaming? 谢谢! 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? sql语句如下 CREATE DATABASE IF NOT EXISTS cdc DROP TABLE IF EXISTS cdc.team CREATE TABLE team( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, proctime as proctime() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'team' ) CREATE DATABASE IF NOT EXISTS ods DROP TABLE IF EXISTS ods.team CREATE TABLE ods.team ( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, ) PARTITIONED BY ( ts_date STRING, ts_hour STRING, ts_minute STRING, ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' ) INSERT INTO ods.team SELECT team_id, team_name, create_time, update_time, my_date_format(create_time,'-MM-dd', 'Asia/Shanghai'), my_date_format(create_time,'HH', 'Asia/Shanghai'), my_date_format(create_time,'mm', 'Asia/Shanghai') FROM cdc.team
回复:回复: 回复: flink 自定义udf注册后不能使用
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年10月16日 15:45,amen...@163.com 写道: 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug best, amenhub 发件人: 史 正超 发送时间: 2020-10-16 15:26 收件人: user-zh@flink.apache.org 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:回复: 回复: flink 自定义udf注册后不能使用
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年10月16日 15:45,amen...@163.com 写道: 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug best, amenhub 发件人: 史 正超 发送时间: 2020-10-16 15:26 收件人: user-zh@flink.apache.org 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink webui前端
大家好,请问flink的webui前端实现的源码在哪呀 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制
回复:(无主题)
感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的 best shizk233 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制 在2020年07月21日 15:04,罗显宴 写道: hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了 val result = num.timeWindowAll(Time.seconds(20)) //.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20))) .process(new ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] { private var itemState: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { itemState = getRuntimeContext.getMapState(new MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int]))) } override def process(context: Context, elements: Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = { var timestamp:Long = 0L elements.foreach(kv => { itemState.put(kv.category, 1) timestamp = (kv.timestamp/2000+1)*2000 }) import scala.collection.JavaConversions._ out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,itemState.keys().size)) } }) | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道: hi, 我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 13:58,shizk233 写道: Hi, 首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。 按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger, 也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。 这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer, 那么timer在12点的时候会输出的结果就是(12点,1)。 如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道: 好的, 输入: 心功能不全和心律失常用药,1,时间戳 心功能不全和心律失常用药,1,时间戳 抗利尿剂,1,时间戳 血管收缩剂,1,时间戳 血管紧张素II受体拮抗剂,1,时间戳 这里的时间戳就是eventtime了 比如前三条是在一个20秒窗口中,所以应该分为两个窗口: 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2 输出4 即输出: 2020-7-20 19:00:00,2 2020-7-20 19:00:20,4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:37,shizk233 写道: Hi, 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道: hi, CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:10,shizk233 写道: Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了 val result = num.timeWindowAll(Time.seconds(20)) //.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20))) .process(new ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] { private var itemState: MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { itemState = getRuntimeContext.getMapState(new MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int]))) } override def process(context: Context, elements: Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = { var timestamp:Long = 0L elements.foreach(kv => { itemState.put(kv.category, 1) timestamp = (kv.timestamp/2000+1)*2000 }) import scala.collection.JavaConversions._ out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,itemState.keys().size)) } }) | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道: hi, 我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 13:58,shizk233 写道: Hi, 首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。 按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger, 也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。 这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer, 那么timer在12点的时候会输出的结果就是(12点,1)。 如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道: 好的, 输入: 心功能不全和心律失常用药,1,时间戳 心功能不全和心律失常用药,1,时间戳 抗利尿剂,1,时间戳 血管收缩剂,1,时间戳 血管紧张素II受体拮抗剂,1,时间戳 这里的时间戳就是eventtime了 比如前三条是在一个20秒窗口中,所以应该分为两个窗口: 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2 输出4 即输出: 2020-7-20 19:00:00,2 2020-7-20 19:00:20,4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:37,shizk233 写道: Hi, 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道: hi, CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:10,shizk233 写道: Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
hi, 我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 13:58,shizk233 写道: Hi, 首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。 按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger, 也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。 这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer, 那么timer在12点的时候会输出的结果就是(12点,1)。 如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道: 好的, 输入: 心功能不全和心律失常用药,1,时间戳 心功能不全和心律失常用药,1,时间戳 抗利尿剂,1,时间戳 血管收缩剂,1,时间戳 血管紧张素II受体拮抗剂,1,时间戳 这里的时间戳就是eventtime了 比如前三条是在一个20秒窗口中,所以应该分为两个窗口: 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2 输出4 即输出: 2020-7-20 19:00:00,2 2020-7-20 19:00:20,4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:37,shizk233 写道: Hi, 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道: hi, CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:10,shizk233 写道: Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
好的, 输入: 心功能不全和心律失常用药,1,时间戳 心功能不全和心律失常用药,1,时间戳 抗利尿剂,1,时间戳 血管收缩剂,1,时间戳 血管紧张素II受体拮抗剂,1,时间戳 这里的时间戳就是eventtime了 比如前三条是在一个20秒窗口中,所以应该分为两个窗口: 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2 输出4 即输出: 2020-7-20 19:00:00,2 2020-7-20 19:00:20,4 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:37,shizk233 写道: Hi, 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道: hi, CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:10,shizk233 写道: Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
hi, CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月21日 11:10,shizk233 写道: Hi, 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END, 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。 你可以让acc做个累加,然后结果输出里把acc的值带上看看。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道: 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制 package com.bupt.main import java.sql.Timestamp import java.util import java.util.Properties import org.apache.flink.api.common.functions.{AggregateFunction, FlatMapFunction, MapFunction, RuntimeContext} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.flink.util.Collector import org.apache.http.HttpHost import org.elasticsearch.client.Requests case class Drug(id:String,ATCCode:String,MIMSType:String, name:String,other:String, producer:String, retailPrice:String,composition:String, medicineRank:String, timestamp:Long) case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long) case class ItemViewCount(category:String,windowEnd:Long) case class IncreasePerHour(time:String,num:Long) object KafkaToElasticSearch { def main(args: Array[String]): Unit = { // 1. å建æ§è¡ç¯å¢ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val props = new Properties props.put("bootstrap.servers", "master:9092") props.put("zookeeper.connect", "master:2181") props.put("group.id", "drug-group") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("auto.offset.reset", "earliest") val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", //è¿ä¸ª kafka topic éè¦åä¸é¢çå·¥å ·ç±»ç topic ä¸è´ new SimpleStringSchema, props)).setParallelism(1) .filter(string =>{ val words = string.split(",") words.length == 10 && words(2).length!=0 }) .map(new MapFunction[String,Drug] { override def map(value: String): Drug = { val words = value.split(",") if(words.length!=10) println(words) Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong) } } ).assignAscendingTimestamps( _.timestamp ) //drugs.print() val num = drugs.map(drug => { var temp: StringBuilder = new StringBuilder(drug.MIMSType) if (temp.length != 0 && temp.charAt(0) == '-') temp.deleteCharAt(0) if (temp.length != 0 && temp.charAt(temp.length - 1) == ';') temp.deleteCharAt(temp.length - 1) var validateResult: String = null if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' ')) IncreaseNumPerHour(validateResult, 1l, drug.timestamp) }) //num.print() // val result = num.keyBy(_.category) .timeWindow(Time.hours(1)) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20))) .aggregate(new CountAgg,new WindowResult) .keyBy(_.windowEnd) .pr
回复: (无主题)
不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复: (无主题)
我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
回复:(无主题)
好的,谢谢大佬,我用这个试试 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制 在2020年07月20日 15:11,shizk233 写道: Hi, 从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道: > > > 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, > | | > 罗显宴 > | > | > 邮箱:15927482...@163.com > | > 签名由网易邮箱大师定制 > 在2020年7月20日 11:47,shizk233 写道: > Hi, > > 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling > Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 > > Best, > shizk233 > > 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: > > > > 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream > api,希望看到的大佬能帮我解惑一下,谢谢啦 > > | | > 罗显宴 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制 >
回复: (无主题)
不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制