Re: (无主题)

2024-03-12 文章 Xuannan Su
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh@flink.apache.org 
邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。

Best,
Xuannan

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On Tue, Mar 12, 2024 at 2:04 PM willluzheng  wrote:
>
> 退订


(无主题)

2024-03-11 文章 willluzheng
退订

回复:(无主题)

2023-06-15 文章 海风
多谢多谢



 回复的原邮件 
| 发件人 | Weihua Hu |
| 日期 | 2023年06月14日 12:32 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: (无主题) |
>
> 这个状态变量是否需要用transient来修饰

ValueState 再 Rich fuction 的 open 方法中被初始化,不应该被序列化和反序列化,建议使用 transient 来修饰。
但实际上自定义函数的序列化、反序列化只在任务部署阶段执行,而且初始状态下 ValueState 的值是 null,所以不使用 transient
关键字也不会有太大的影响。

以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰

理解自定义函数的序列化、反序列化是在任务部署阶段执行之后,这个问题就比较好回答了。 如果你的变量在是函数的 open 方法内初始化的,那应该增加
transient 关键字来表明该字段不需要参与序列化


Best,
Weihua


On Tue, Jun 13, 2023 at 1:10 PM Paul <18751805...@163.com> wrote:

> 在flink处理函数中定义一个状态变量,比如private ValueState
> vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教
>
>
>


Re: (无主题)

2023-06-13 文章 Weihua Hu
>
> 这个状态变量是否需要用transient来修饰

ValueState 再 Rich fuction 的 open 方法中被初始化,不应该被序列化和反序列化,建议使用 transient 来修饰。
但实际上自定义函数的序列化、反序列化只在任务部署阶段执行,而且初始状态下 ValueState 的值是 null,所以不使用 transient
关键字也不会有太大的影响。

以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰

理解自定义函数的序列化、反序列化是在任务部署阶段执行之后,这个问题就比较好回答了。 如果你的变量在是函数的 open 方法内初始化的,那应该增加
transient 关键字来表明该字段不需要参与序列化


Best,
Weihua


On Tue, Jun 13, 2023 at 1:10 PM Paul <18751805...@163.com> wrote:

> 在flink处理函数中定义一个状态变量,比如private ValueState
> vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教
>
>
>


(无主题)

2023-06-12 文章 Paul
在flink处理函数中定义一个状态变量,比如private ValueState 
vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教




(无主题)

2023-03-22 文章 李朋


(无主题)

2022-12-07 文章 zhufangliang
退订




(无主题)

2022-10-07 文章 陈鑫
退订



(无主题)

2022-06-12 文章 李国辉

退订

Re:(无主题)

2022-06-09 文章 Xuyang



Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org










--

Best!
Xuyang





在 2022-06-09 16:12:11,"zehir.tong"  写道:
>退订


(无主题)

2022-06-09 文章 zehir.tong
退订

(无主题)

2022-06-09 文章 zehir.tong
退订

Re: (无主题)

2022-04-15 文章 Shengkai Fang
退订的同学,请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。

BEST,
Shengkai

王健 <13166339...@163.com> 于2022年4月15日周五 14:40写道:

> 退订


(无主题)

2022-04-14 文章 王健
退订

(无主题)

2022-01-11 文章 生如夏花
退订

(无主题)

2021-10-20 文章 TROY

退订

Re: <无主题>

2021-09-07 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org,其他邮件列表退订邮箱参见
https://flink.apache.org/community.html#mailing-lists

戴巍(R&D)  于2021年9月7日周二 下午2:36写道:

> 退订
>


<无主题>

2021-09-06 文章
退订


Re: (无主题)

2021-08-05 文章 Choi Steven
退订


On Wed, 4 Aug 2021 at 18:28, 洗你的头 <1264386...@qq.com.invalid> wrote:

> 退订


(无主题)

2021-08-04 文章 洗你的头
退订

Re:Re: (无主题)

2021-06-21 文章 田磊
用的是perjob的模式,版本是1.12.1,因为我读的是hbase的数据,用的流的模式,是不是数据读完之后任务不会结束啊。下面是我的提交命令:./flink-1.12.1/bin/flink
 run -t yarn-per-job -m yarn-cluster -ynm historyPhotosConstientHandle -ys 8 
-yjm 4096 -ytm 10240 -c 
com.chinaunicom.audit.photo.history.handle.AuditPhotoHistoryNumberHandleApp 
/mnt/sd02/tglf005/devsoft/data-audit-photo-history-handle-2.1.jar,我发现如果指定了perjob参数的话,那么你在命令中指定的slot数量不起作用,我代码的最大并行度为16,按理来说应该分配2个taskmanager,但是却是分配了16个,每个taskmanager1个slot。如果不指定
 -t yarn-per-job的话,那么提交后就显示的是2个taskmanager,不是很明白,小白一个,望大家指点一二。
在 2021-06-21 18:26:33,"刘建刚"  写道:
>嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。
>
>田磊  于2021年6月20日周日 上午11:14写道:
>
>>
>> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。
>>
>>
>> | |
>> totorobabyfans
>> |
>> |
>> 邮箱:totorobabyf...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制


Re: (无主题)

2021-06-21 文章 刘建刚
嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。

田磊  于2021年6月20日周日 上午11:14写道:

>
> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: (无主题)

2021-06-20 文章 todd
如果使用flink run 指令启动任务,加-d 指令。作业执行结束后会退出app。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

(无主题)

2021-06-19 文章 田磊
我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。


| |
totorobabyfans
|
|
邮箱:totorobabyf...@163.com
|

签名由 网易邮箱大师 定制

Re: (无主题)

2021-06-14 文章 Jingsong Li
发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Tue, Jun 15, 2021 at 12:32 PM 1049961436 <1049961...@qq.com> wrote:

> 退订



-- 
Best, Jingsong Lee


(无主题)

2021-06-14 文章 1049961436
退订

(无主题)

2021-06-02 文章 田磊
 您好:
在一个app应用程序中,如果我用flink自定义source读hbase里面的表,处理完之后再通过sink更新到hbase里面,这样会不会出现冲突,有没有可能从source里面来的数据是已经处理过的数据。还有一种情况是第一个程序走完一套逻辑将数据更新到hbase中,同时另外一套程序从这张表中自定义source将数据再更新到该表中,会不会出现冲突呢。


| |
totorobabyfans
|
|
邮箱:totorobabyf...@163.com
|

签名由 网易邮箱大师 定制

(无主题)

2021-05-30 文章 Hongyuan Ma
退订



(无主题)

2021-05-30 文章 陈海剑
退订


| |
陈海剑
|
|
邮箱:13469958...@163.com
best regards
|

签名由 网易邮箱大师 定制

(无主题)

2021-05-18 文章 maozhaolin
在yarn模式下提交flink任务总是报没有这个东西是为啥啊
org.apache.flink.metrics.MetricGroup.addGroup(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/flink/metrics/MetricGroup;
| |
mao18698726900
|
|
邮箱:mao18698726...@163.com
|

签名由 网易邮箱大师 定制

(无主题)

2021-04-19 文章 小屁孩
退订

(无主题)

2021-04-16 文章 maozhaolin
退订


| |
mao18698726900
|
|
邮箱:mao18698726...@163.com
|

签名由 网易邮箱大师 定制

Re: (无主题)

2021-04-06 文章 Gauler Tan
退订

guoyb <861277...@qq.com> 于2021年4月6日周二 下午5:57写道:

> To remove your address from the list, send a message to:
>       
>
>
> ---原始邮件---
> 发件人: "郭华威" 发送时间: 2021年4月6日(周二) 下午5:52
> 收件人: "user-zh@flink.apache.org" 主题: 回复:(无主题)
>
>
> 请问如何退订
>
> 在2021年04月06日 16:37,郭华威 写道:
> 退订


回复:(无主题)

2021-04-06 文章 guoyb
To remove your address from the list, send a message to:
      

回复:(无主题)

2021-04-06 文章 郭华威
请问如何退订

在2021年04月06日 16:37,郭华威 写道:
退订

(无主题)

2021-04-06 文章 郭华威
退订

(无主题)

2021-04-06 文章 郭华威
退订

回复:(无主题)

2021-04-06 文章 yangxiaofei

退订



| |
hello
|
|
yangx...@163.com
|

Signature is customized by Netease Mail Master

在2021年04月06日 11:46,郭华威 写道:
退订

(无主题)

2021-04-05 文章 郭华威
退订

(无主题)

2021-03-30 文章 高耀军
退订


| |
高耀军
|
|
邮箱:18221112...@163.com
|

签名由 网易邮箱大师 定制

(无主题)

2021-03-29 文章 于林之下

TD

| |
于林之下
|
|
邮箱:sishengqikuo...@163.com
|

签名由 网易邮箱大师 定制

(无主题)

2021-03-16 文章 黄志高
各位大佬,在做远程提交任务到flink的standalone模式,抛以下异常
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
at 
cn.encdata.cloud.dataengine.core.RemoteEnvironment.execute(RemoteEnvironment.java:145)
at 
cn.encdata.cloud.dataengine.core.builder.BatchJobBuilder.execute(BatchJobBuilder.java:89)
... 25 common frames omitted
Caused by: java.lang.InterruptedException: null
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:976)

(无主题)

2020-11-24 文章 gfjia
退订


| |
gfjia
|
|
邮箱:gfjia_t...@163.com
|

签名由 网易邮箱大师 定制

Re: (无主题)

2020-08-16 文章 art
hi,感谢提供的方案

是的,day是订单的创建时间

想请教下,你们的离线任务是每次都将全量订单数据一起修正吗,就是不管历史的有没有变化 都去作修正,

要是这样那会不会出现绝大部分情况,离线跑的任务都是无效的,因为历史数据未发生变化

> 在 2020年8月17日,下午1:22,zhiyezou <1530130...@qq.com> 写道:
> 
> HI 
> 
> 
> 这个day应该是订单的创建时间吧
> 
> 
> 我觉得我们遇到的问题有些类似,看下我们的方案对你是否有所帮助。
> 
> 
> 首先,我们会把day这个条件控制在3天(select * where day 
> >now-3),状态的TTL也是3天,即flink保留3天的状态,这样即使有3天前的数据到来也不会更新我们的结果表;这样可以解决更新错误的问题。
> 
> 
> 然后,通过离线任务来定时修正3天前的结果数据。这样可以保证数据的最终一致性
> 
> 
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>      
>  发送时间: 2020年8月17日(星期一) 中午12:32
> 收件人: "user-zh@flink.apache.org" 
> 主题: (无主题)
> 
> 
> 
> hi,社区的小伙伴,大家好!我有一个应用场景,想请教下大家有没有遇过,有什么好的方案。
> 场景就是:按照user和day的维度统计订单表里的有效订单数,同时存在历史的订单状态随时可能被更新,比如可能当前把2个月前的订单状态置未true,所以没法根据历史结果预统计,翻译称sql就是select
>  user,day,count(*) from table where state = true group by 
> user,day;目前我已经用flink-sql-cdc-connector实现了,但是有一个问题就是state,因为按user day组合 
> 那么如果全部状态都保存后期回越来越大,但是如果设置ttl,那么如果历史订单变化,最终更新出去的值也不对。 
> 希望社区的小伙伴给我出出主意



(无主题)

2020-08-16 文章 superainbower
hi,社区的小伙伴,大家好!我有一个应用场景,想请教下大家有没有遇过,有什么好的方案。
场景就是:按照user和day的维度统计订单表里的有效订单数,同时存在历史的订单状态随时可能被更新,比如可能当前把2个月前的订单状态置未true,所以没法根据历史结果预统计,翻译称sql就是select
 user,day,count(*) from table where state = true group by 
user,day;目前我已经用flink-sql-cdc-connector实现了,但是有一个问题就是state,因为按user day组合 
那么如果全部状态都保存后期回越来越大,但是如果设置ttl,那么如果历史订单变化,最终更新出去的值也不对。 
希望社区的小伙伴给我出出主意

Re: (无主题)

2020-07-23 文章 shizk233
恭喜!

罗显宴 <15927482...@163.com> 于2020年7月23日周四 上午1:14写道:

> 感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
> best
> shizk233
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月21日 15:04,罗显宴 写道:
> hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
> val result = num.timeWindowAll(Time.seconds(20))
> //.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
> .process(new
> ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {
>
> private var itemState: MapState[String,Int] = _
>
> override def open(parameters: Configuration): Unit = {
> itemState = getRuntimeContext.getMapState(new
> MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
>  }
>
> override def process(context: Context, elements:
> Iterable[IncreaseNumPerHour], out: Collector[IncreasePerHour]): Unit = {
> var timestamp:Long = 0L
> elements.foreach(kv => {
> itemState.put(kv.category, 1)
> timestamp = (kv.timestamp/2000+1)*2000
> })
> import scala.collection.JavaConversions._
>out.collect(IncreasePerHour(new Timestamp( timestamp - 1
> ).toString,itemState.keys().size))
>  }
>})
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道:
>
>
> hi,
>
> 我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 13:58,shizk233 写道:
> Hi,
>
> 首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。
>
> 按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
> 也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
> 这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
> 那么timer在12点的时候会输出的结果就是(12点,1)。
>
> 如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。
>
> 而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
> 期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。
>
> 我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:
>
> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>
>


回复:(无主题)

2020-07-22 文章 罗显宴
感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
best
shizk233


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月21日 15:04,罗显宴 写道:
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new 
ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new 
MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
 }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], 
out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
   out.collect(IncreasePerHour(new Timestamp( timestamp - 1 
).toString,itemState.keys().size))
 }
   })


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




回复: (无主题)

2020-07-21 文章 罗显宴
hi,我想到解决办法了,可以用全局window,我一直以为是要分区在做窗口运算其实可以直接用timewindowAll来算,然后用状态保存就够了
val result = num.timeWindowAll(Time.seconds(20))
//.trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
.process(new 
ProcessAllWindowFunction[IncreaseNumPerHour,IncreasePerHour,TimeWindow] {

private var itemState: MapState[String,Int] = _

override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getMapState(new 
MapStateDescriptor[String,Int]("item-state",TypeInformation.of(classOf[String]),TypeInformation.of(classOf[Int])))
  }

override def process(context: Context, elements: Iterable[IncreaseNumPerHour], 
out: Collector[IncreasePerHour]): Unit = {
var timestamp:Long = 0L
elements.foreach(kv => {
itemState.put(kv.category, 1)
timestamp = (kv.timestamp/2000+1)*2000
})
import scala.collection.JavaConversions._
out.collect(IncreasePerHour(new Timestamp( timestamp - 1 
).toString,itemState.keys().size))
  }
})


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 14:15,罗显宴<15927482...@163.com> 写道:


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




回复: (无主题)

2020-07-20 文章 罗显宴


hi,
我觉得你说的是对的,我刚才没有理解trigger,我以为trigger当成一个1小时窗口的20分钟的小窗口了,其实我要的结果就是每20分钟有多少个窗口比如当前20分钟有A类型、B类型和C类型三个窗口,那么输出就是3,后来20分钟有A类型、B类型和D类型的结果,那么A类型和B类型是重复的只有D不是重复的,结果为4
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 13:58,shizk233 写道:
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,


CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,


这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:




大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制




Re: (无主题)

2020-07-20 文章 shizk233
Hi,

首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。

按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。

如果12-13点里,A类型做相同的输入,MapState新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。

而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。

我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:

> 好的,
> 输入:
> 心功能不全和心律失常用药,1,时间戳
> 心功能不全和心律失常用药,1,时间戳
> 抗利尿剂,1,时间戳
> 血管收缩剂,1,时间戳
> 血管紧张素II受体拮抗剂,1,时间戳
>
>
> 这里的时间戳就是eventtime了
> 比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
> 心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
> 所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
> 接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
> 输出4
>
>
> 即输出:
> 2020-7-20 19:00:00,2
> 2020-7-20 19:00:20,4
>
>
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:37,shizk233 写道:
> Hi,
>
> 有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
>
> hi,
>
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


回复: (无主题)

2020-07-20 文章 罗显宴
好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳


这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4


即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4




| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:37,shizk233 写道:
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

hi,

CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,

这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:



大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制



Re: (无主题)

2020-07-20 文章 shizk233
Hi,

有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:

> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月21日 11:10,shizk233 写道:
> Hi,
>
>
> 我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
> 而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
>
>
> 你可以让acc做个累加,然后结果输出里把acc的值带上看看。
>
>
> Best,
> shizk233
>
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
>
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
> 不好意思,刚才发的快,没来得及解释,
>
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


回复: (无主题)

2020-07-20 文章 罗显宴
hi,
CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,


我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。


你可以让acc做个累加,然后结果输出里把acc的值带上看看。


Best,
shizk233


罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:



大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


Re: (无主题)

2020-07-20 文章 shizk233
Hi,

我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。

你可以让acc做个累加,然后结果输出里把acc的值带上看看。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:

>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 20:38,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
> 不好意思,刚才发的快,没来得及解释,
> 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
>
>
> 罗显宴
> 邮箱:15927482...@163.com
>
> 
> 签名由 网易邮箱大师  定制
> 在2020年7月20日 14:09,罗显宴<15927482...@163.com> <15927482...@163.com> 写道:
>
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
>


回复: (无主题)

2020-07-20 文章 罗显宴


大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制
package com.bupt.main

import java.sql.Timestamp
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.{AggregateFunction, 
FlatMapFunction, MapFunction, RuntimeContext}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests



case class Drug(id:String,ATCCode:String,MIMSType:String,
name:String,other:String, producer:String,
retailPrice:String,composition:String,
medicineRank:String, timestamp:Long)

case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long)

case class ItemViewCount(category:String,windowEnd:Long)

case class IncreasePerHour(time:String,num:Long)

object KafkaToElasticSearch {
  def main(args: Array[String]): Unit = {
// 1. 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val props = new Properties
props.put("bootstrap.servers", "master:9092")
props.put("zookeeper.connect", "master:2181")
props.put("group.id", "drug-group")
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")

val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", 
//这个 kafka topic 需要和上面的工具类的 topic 一致
  new SimpleStringSchema, props)).setParallelism(1)
  .filter(string =>{
val words = string.split(",")
words.length == 10 && words(2).length!=0
  })
  .map(new MapFunction[String,Drug] {
  override def map(value: String): Drug = {
  val words = value.split(",")
if(words.length!=10)
println(words)
  
Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong)
  }
} ).assignAscendingTimestamps( _.timestamp )
//drugs.print()
val num = drugs.map(drug => {
  var temp: StringBuilder = new StringBuilder(drug.MIMSType)

  if (temp.length != 0 && temp.charAt(0) == '-')
temp.deleteCharAt(0)
  if (temp.length != 0 && temp.charAt(temp.length - 1) == ';')
temp.deleteCharAt(temp.length - 1)
  var validateResult: String = null
  if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' 
'))
  IncreaseNumPerHour(validateResult, 1l, drug.timestamp)
})

//num.print()
//
val result = num.keyBy(_.category)
  .timeWindow(Time.hours(1))
  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
  .aggregate(new CountAgg,new WindowResult)
  .keyBy(_.windowEnd)
  .process(new IncreaseItems)
result.print()

env.execute("compute num")
  }

}

// 自定义预聚合函数

//in、acc、out
//在这里面,累加器相当于是state
class CountAgg() extends AggregateFunct

回复: (无主题)

2020-07-20 文章 罗显宴
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复: (无主题)

2020-07-20 文章 罗显宴
 我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


回复:(无主题)

2020-07-20 文章 罗显宴
好的,谢谢大佬,我用这个试试


| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月20日 15:11,shizk233 写道:
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


Re: (无主题)

2020-07-20 文章 shizk233
Hi,

从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:

>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年7月20日 11:47,shizk233 写道:
> Hi,
>
> 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
> Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
>
> Best,
> shizk233
>
> 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
>
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>


回复: (无主题)

2020-07-19 文章 罗显宴


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制


Re: (无主题)

2020-07-19 文章 shizk233
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:

>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望看到的大佬能帮我解惑一下,谢谢啦
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制


(无主题)

2020-07-19 文章 罗显宴

大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
 api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制

Re: (无主题)

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Tue, Jul 14, 2020 at 12:36 PM 成欢晴  wrote:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master



-- 
Best, Jingsong Lee


(无主题)

2020-07-13 文章 成欢晴
退订


| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

(无主题)

2020-04-16 文章 李朋


回复:(无主题)

2020-03-26 文章 被惊艳的时光
并发是200和400两种,集群有270多个节点,不过可用的vcores是6600多,内存是17T左右,看了执行图q43这个存在数据倾斜的的问题,失败的节点存在数据量偏大的情况



---原始邮件---
发件人: "Jingsong Li"

Re: (无主题)

2020-03-26 文章 Jingsong Li
Hi,

- 是否是计算规模的问题?
集群大小合适吗?并发合适吗?

- 是否是Plan不优的问题?
Hive的表有做Analysis吗?

CC: user

Best,
Jingsong Lee

On Thu, Mar 26, 2020 at 8:27 PM 被惊艳的时光 <2521929...@qq.com> wrote:

>
> hello,你好,有个关于flink-sql-benchmark工具的问题需要请教下,在做tpc-ds测试时,当数据量达到4T时(flink版本1.10),q43,q67,q70这三条sql执行出错了,都是在hashjoin的时候失败啦,报错信息是hashjoin迭代的次数过多,不知道之前你们在测试时有没有出现这种情况
>


-- 
Best, Jingsong Lee


转发:(无主题)

2020-03-18 文章 酷酷的浑蛋




| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

- 转发邮件信息 -

发件人: 酷酷的浑蛋 
发送日期: 2020年3月18日 15:15
发送至: user-zh 


现在我发现个问题:flink sql实时 inner join ,结果会发生乱序,请问这是正常的吗
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

转发:(无主题)

2020-03-13 文章 酷酷的浑蛋




| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

- 转发邮件信息 -

发件人: 酷酷的浑蛋 
发送日期: 2020年3月13日 19:18
发送至: user-zh@flink.apache.org 
select a.x01,
   udf_name(a.x02)
from (select a.x01,
 a...
  from tb_name) a
  join
  (select * from tb_name) b
  on a.xx = b.xx
这是一段flink实时sql,其中总是偶尔有几条数据乱序,请问大佬们遇到过吗?或者该怎么解决?
| |
王太阳
|
|
apach...@163.com
|
签名由网易邮箱大师定制