回复: 根据业务需求选择合适的flink state
退订 | | 纪军伟 | | jjw8610...@163.com | 签名由网易邮箱大师定制 在2021年01月23日 15:43,徐州州<25977...@qq.com> 写道: 我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。 -- 原始邮件 -- 发件人: "张锴"https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com
Re: 根据业务需求选择合适的flink state
ATE_FORMAT) > > >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > >> DateUtil.SECOND_DATE_FORMAT) > > >> val duration = (endTime - startTime) / 1000 //停留多少秒 > > >> val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > > >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > >> join_time, leave_time, created_time, modified_time > > >> , liveType, plat_form, duration, duration_time, > > >> network_operator, role, useragent, uid, eventTime)) > > >> > > >> CloudliveWatcher(id, partnerId, courseId, customerId, > > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > >> join_time, leave_time, created_time, modified_time > > >> , liveType, plat_form, duration, duration_time, > > >> network_operator, role, useragent, uid, eventTime) > > >> > > >> } > > >> > > >> > > >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > >> > > >> > > >> > > >> > > >> 赵一旦 于2020年12月28日周一 下午7:12写道: > > >> > > >> > 按直播间ID和用户ID分组,使用session > > >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > >> > > > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > >> > > > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > >> > > > >> > > > >> > 张锴 于2020年12月28日周一 下午5:35写道: > > >> > > > >> > > 能描述一下用session window的考虑吗 > > >> > > > > >> > > Akisaya 于2020年12月28日周一 下午5:00写道: > > >> > > > > >> > > > 这个可以用 session window 吧 > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > >> > > > > > >> > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > >> > > > > > >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > news_...@163.com > > >> > > > > > > >> > > > > 发件人: 张锴 > > >> > > > > 发送时间: 2020-12-28 13:35 > > >> > > > > 收件人: user-zh > > >> > > > > 主题: 根据业务需求选择合适的flink state > > >> > > > > 各位大佬帮我分析下如下需求应该怎么写 > > >> > > > > > > >> > > > > 需求说明: > > >> > > > > > > >> > > > > >> > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > >> > > > > > > >> > > > > 我的想法: > > >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event > > >> Time中的分钟数 > > >> > > > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > >> > > > > > > >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > >> > > > > > > >> > > > > flink 版本1.10.1 > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > >
Re: 根据业务需求选择合适的flink state
[MinMaxTemp] = elements.iterator if (iterator.hasNext) { val value: MinMaxTemp = iterator.next() id = value.id courseId= value.courseId partnerId = value.partnerId ip = value.ip customerId = value.customerId courseNumber = value.courseNumber nickName = value.nickName liveType = value.liveType uid = value.uid minTsState.update(value.mineventTime) //更新最小时间戳 maxTsState.update(value.maxeventTime) //更新最大时间戳 } join_time = DateUtil.convertTimeStamp2DateStr(minTs, DateUtil.SECOND_DATE_FORMAT) leave_time = DateUtil.convertTimeStamp2DateStr(maxTs, DateUtil.SECOND_DATE_FORMAT) duration = (maxTs - minTs) / 1000 //停留多少秒 duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 minTsState.clear() maxTsState.clear() out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } } 赵一旦 于2021年1月21日周四 下午8:38写道: > 我表达的方法是按照session > window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。 > > 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。 > > > 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。 > > 赵一旦 于2021年1月21日周四 下午8:28写道: > > > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 > > > > 张锴 于2021年1月21日周四 下午6:25写道: > > > >> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > >> > >> > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > >> 下面是我的部分代码逻辑: > >> > >> val ds = dataStream > >> .filter(_.liveType == 1) > >> .keyBy(1, 2) > >> .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > >> .process(new myProcessWindow()).uid("process-id") > >> > >> class myProcessWindow() extends > >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > >> TimeWindow] { > >> > >> override def process(key: Tuple, context: Context, elements: > >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > >> = { > >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > >> > >> val currentDate = DateUtil.currentDate > >> val created_time = currentDate > >> val modified_time = currentDate > >> 。。。 > >> > >> val join_time: String = > >> DateUtil.convertTimeStamp2DateStr(startTime, > >> DateUtil.SECOND_DATE_FORMAT) > >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > >> DateUtil.SECOND_DATE_FORMAT) > >> val duration = (endTime - startTime) / 1000 //停留多少秒 > >> val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > >> join_time, leave_time, created_time, modified_time > >> , liveType, plat_form, duration, duration_time, > >> network_operator, role, useragent, uid, eventTime)) > >> > >> CloudliveWatcher(id, partnerId, courseId, customerId, > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > >> join_time, leave_time, created_time, modified_time > >> , liveType, plat_form, duration, duration_time, > >> network_operator, role, useragent, uid, eventTime) > >> > >> } > >> > >> > >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > >> > >> > >> > >> > >> 赵一旦 于2020年12月28日周一 下午7:12写道: > >> > >> > 按直播间ID和用户ID分组,使用session > >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > >> > > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > >> > > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > >> > > >> > > >> > 张锴 于2020年12月28日周一 下午5:35写道: > >> > > >> > > 能描述一下用session window的考虑吗 > >> > > > >> > > Akisaya 于2020年12月28日周一 下午
Re: 根据业务需求选择合适的flink state
@赵一旦 可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > > 如上按照我的方案就可以实现哈。 > > xuhaiLong 于2021年1月22日周五 上午10:03写道: > > > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum > 试试? > > > > > > 在2021年1月21日 18:24,张锴 写道: > > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > > > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > > 下面是我的部分代码逻辑: > > > > val ds = dataStream > > .filter(_.liveType == 1) > > .keyBy(1, 2) > > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > > .process(new myProcessWindow()).uid("process-id") > > > > class myProcessWindow() extends > > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > > TimeWindow] { > > > > override def process(key: Tuple, context: Context, elements: > > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > > = { > > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > > > val currentDate = DateUtil.currentDate > > val created_time = currentDate > > val modified_time = currentDate > > 。。。 > > > > val join_time: String = > > DateUtil.convertTimeStamp2DateStr(startTime, > > DateUtil.SECOND_DATE_FORMAT) > > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > DateUtil.SECOND_DATE_FORMAT) > > val duration = (endTime - startTime) / 1000 //停留多少秒 > > val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime)) > > > > CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime) > > > > } > > > > > > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > > > > > > > > > 赵一旦 于2020年12月28日周一 下午7:12写道: > > > > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > > 能描述一下用session window的考虑吗 > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > 这个可以用 session window 吧 > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > news_...@163.com > > > > 发件人: 张锴 > > 发送时间: 2020-12-28 13:35 > > 收件人: user-zh > > 主题: 根据业务需求选择合适的flink state > > 各位大佬帮我分析下如下需求应该怎么写 > > > > 需求说明: > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > 我的想法: > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > flink 版本1.10.1 > > > > > > > > > > >
Re: 根据业务需求选择合适的flink state
@赵一旦 我今天调整一下逻辑再试试 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > > 如上按照我的方案就可以实现哈。 > > xuhaiLong 于2021年1月22日周五 上午10:03写道: > > > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum > 试试? > > > > > > 在2021年1月21日 18:24,张锴 写道: > > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > > > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > > 下面是我的部分代码逻辑: > > > > val ds = dataStream > > .filter(_.liveType == 1) > > .keyBy(1, 2) > > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > > .process(new myProcessWindow()).uid("process-id") > > > > class myProcessWindow() extends > > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > > TimeWindow] { > > > > override def process(key: Tuple, context: Context, elements: > > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > > = { > > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > > > val currentDate = DateUtil.currentDate > > val created_time = currentDate > > val modified_time = currentDate > > 。。。 > > > > val join_time: String = > > DateUtil.convertTimeStamp2DateStr(startTime, > > DateUtil.SECOND_DATE_FORMAT) > > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > DateUtil.SECOND_DATE_FORMAT) > > val duration = (endTime - startTime) / 1000 //停留多少秒 > > val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime)) > > > > CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime) > > > > } > > > > > > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > > > > > > > > > 赵一旦 于2020年12月28日周一 下午7:12写道: > > > > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > > 能描述一下用session window的考虑吗 > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > 这个可以用 session window 吧 > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > news_...@163.com > > > > 发件人: 张锴 > > 发送时间: 2020-12-28 13:35 > > 收件人: user-zh > > 主题: 根据业务需求选择合适的flink state > > 各位大佬帮我分析下如下需求应该怎么写 > > > > 需求说明: > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > 我的想法: > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > flink 版本1.10.1 > > > > > > > > > > >
Re: 根据业务需求选择合适的flink state
我理解你要的最终mysql结果表是: 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 如上按照我的方案就可以实现哈。 xuhaiLong 于2021年1月22日周五 上午10:03写道: > 可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 > userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试? > > > 在2021年1月21日 18:24,张锴 写道: > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > 下面是我的部分代码逻辑: > > val ds = dataStream > .filter(_.liveType == 1) > .keyBy(1, 2) > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > .process(new myProcessWindow()).uid("process-id") > > class myProcessWindow() extends > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > TimeWindow] { > > override def process(key: Tuple, context: Context, elements: > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > = { > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > val currentDate = DateUtil.currentDate > val created_time = currentDate > val modified_time = currentDate > 。。。 > > val join_time: String = > DateUtil.convertTimeStamp2DateStr(startTime, > DateUtil.SECOND_DATE_FORMAT) > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > DateUtil.SECOND_DATE_FORMAT) > val duration = (endTime - startTime) / 1000 //停留多少秒 > val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime)) > > CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime) > > } > > > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > > > > 赵一旦 于2020年12月28日周一 下午7:12写道: > > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > 张锴 于2020年12月28日周一 下午5:35写道: > > 能描述一下用session window的考虑吗 > > Akisaya 于2020年12月28日周一 下午5:00写道: > > 这个可以用 session window 吧 > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > news_...@163.com > > 发件人: 张锴 > 发送时间: 2020-12-28 13:35 > 收件人: user-zh > 主题: 根据业务需求选择合适的flink state > 各位大佬帮我分析下如下需求应该怎么写 > > 需求说明: > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > 我的想法: > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > flink 版本1.10.1 > > > > >
回复: 根据业务需求选择合适的flink state
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试? 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minutes(1))) .process(new myProcessWindow()).uid("process-id") class myProcessWindow() extends ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit = { var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 val currentDate = DateUtil.currentDate val created_time = currentDate val modified_time = currentDate 。。。 val join_time: String = DateUtil.convertTimeStamp2DateStr(startTime, DateUtil.SECOND_DATE_FORMAT) val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, DateUtil.SECOND_DATE_FORMAT) val duration = (endTime - startTime) / 1000 //停留多少秒 val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? 赵一旦 于2020年12月28日周一 下午7:12写道: 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 张锴 于2020年12月28日周一 下午5:35写道: 能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: 这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 news_...@163.com 发件人: 张锴 发送时间: 2020-12-28 13:35 收件人: user-zh 主题: 根据业务需求选择合适的flink state 各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1
回复: 根据业务需求选择合适的flink state
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minutes(1))) .process(new myProcessWindow()).uid("process-id") class myProcessWindow() extends ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit = { var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 val currentDate = DateUtil.currentDate val created_time = currentDate val modified_time = currentDate 。。。 val join_time: String = DateUtil.convertTimeStamp2DateStr(startTime, DateUtil.SECOND_DATE_FORMAT) val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, DateUtil.SECOND_DATE_FORMAT) val duration = (endTime - startTime) / 1000 //停留多少秒 val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? 赵一旦 于2020年12月28日周一 下午7:12写道: 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 张锴 于2020年12月28日周一 下午5:35写道: 能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: 这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 news_...@163.com 发件人: 张锴 发送时间: 2020-12-28 13:35 收件人: user-zh 主题: 根据业务需求选择合适的flink state 各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1
Re: 根据业务需求选择合适的flink state
我表达的方法是按照session window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。 赵一旦 于2021年1月21日周四 下午8:28写道: > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 > > 张锴 于2021年1月21日周四 下午6:25写道: > >> 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 >> >> context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 >> 下面是我的部分代码逻辑: >> >> val ds = dataStream >> .filter(_.liveType == 1) >> .keyBy(1, 2) >> .window(EventTimeSessionWindows.withGap(Time.minutes(1))) >> .process(new myProcessWindow()).uid("process-id") >> >> class myProcessWindow() extends >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, >> TimeWindow] { >> >> override def process(key: Tuple, context: Context, elements: >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit >> = { >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 >> >> val currentDate = DateUtil.currentDate >> val created_time = currentDate >> val modified_time = currentDate >> 。。。 >> >> val join_time: String = >> DateUtil.convertTimeStamp2DateStr(startTime, >> DateUtil.SECOND_DATE_FORMAT) >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, >> DateUtil.SECOND_DATE_FORMAT) >> val duration = (endTime - startTime) / 1000 //停留多少秒 >> val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, >> join_time, leave_time, created_time, modified_time >> , liveType, plat_form, duration, duration_time, >> network_operator, role, useragent, uid, eventTime)) >> >> CloudliveWatcher(id, partnerId, courseId, customerId, >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, >> join_time, leave_time, created_time, modified_time >> , liveType, plat_form, duration, duration_time, >> network_operator, role, useragent, uid, eventTime) >> >> } >> >> >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? >> >> >> >> >> 赵一旦 于2020年12月28日周一 下午7:12写道: >> >> > 按直播间ID和用户ID分组,使用session >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 >> > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 >> > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 >> > >> > >> > 张锴 于2020年12月28日周一 下午5:35写道: >> > >> > > 能描述一下用session window的考虑吗 >> > > >> > > Akisaya 于2020年12月28日周一 下午5:00写道: >> > > >> > > > 这个可以用 session window 吧 >> > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows >> > > > >> > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: >> > > > >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 >> > > > > >> > > > > >> > > > > >> > > > > news_...@163.com >> > > > > >> > > > > 发件人: 张锴 >> > > > > 发送时间: 2020-12-28 13:35 >> > > > > 收件人: user-zh >> > > > > 主题: 根据业务需求选择合适的flink state >> > > > > 各位大佬帮我分析下如下需求应该怎么写 >> > > > > >> > > > > 需求说明: >> > > > > >> > > >> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A >> > > > > >> > > > > >> > > > >> > > >> > >> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 >> > > > > >> > > > > 我的想法: >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event >> Time中的分钟数 >> > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 >> > > > > >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 >> > > > > >> > > > > flink 版本1.10.1 >> > > > > >> > > > >> > > >> > >> >
Re: 根据业务需求选择合适的flink state
我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 张锴 于2021年1月21日周四 下午6:25写道: > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > 下面是我的部分代码逻辑: > > val ds = dataStream > .filter(_.liveType == 1) > .keyBy(1, 2) > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > .process(new myProcessWindow()).uid("process-id") > > class myProcessWindow() extends > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > TimeWindow] { > > override def process(key: Tuple, context: Context, elements: > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > = { > var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > val currentDate = DateUtil.currentDate > val created_time = currentDate > val modified_time = currentDate > 。。。 > > val join_time: String = > DateUtil.convertTimeStamp2DateStr(startTime, > DateUtil.SECOND_DATE_FORMAT) > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > DateUtil.SECOND_DATE_FORMAT) > val duration = (endTime - startTime) / 1000 //停留多少秒 > val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime)) > > CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime) > > } > > > 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > > > > 赵一旦 于2020年12月28日周一 下午7:12写道: > > > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > > > 能描述一下用session window的考虑吗 > > > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > > > > 这个可以用 session window 吧 > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > > > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > > > > > > > > > > > > > news_...@163.com > > > > > > > > > > 发件人: 张锴 > > > > > 发送时间: 2020-12-28 13:35 > > > > > 收件人: user-zh > > > > > 主题: 根据业务需求选择合适的flink state > > > > > 各位大佬帮我分析下如下需求应该怎么写 > > > > > > > > > > 需求说明: > > > > > > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > > > > > > > 我的想法: > > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event > Time中的分钟数 > > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > > > > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > > > > > > > flink 版本1.10.1 > > > > > > > > > > > > > > >
Re: 根据业务需求选择合适的flink state
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minutes(1))) .process(new myProcessWindow()).uid("process-id") class myProcessWindow() extends ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit = { var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 val currentDate = DateUtil.currentDate val created_time = currentDate val modified_time = currentDate 。。。 val join_time: String = DateUtil.convertTimeStamp2DateStr(startTime, DateUtil.SECOND_DATE_FORMAT) val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, DateUtil.SECOND_DATE_FORMAT) val duration = (endTime - startTime) / 1000 //停留多少秒 val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? 赵一旦 于2020年12月28日周一 下午7:12写道: > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > 能描述一下用session window的考虑吗 > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > > 这个可以用 session window 吧 > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > > > > > > > > > news_...@163.com > > > > > > > > 发件人: 张锴 > > > > 发送时间: 2020-12-28 13:35 > > > > 收件人: user-zh > > > > 主题: 根据业务需求选择合适的flink state > > > > 各位大佬帮我分析下如下需求应该怎么写 > > > > > > > > 需求说明: > > > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > > > > > 我的想法: > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > > > > > flink 版本1.10.1 > > > > > > > > > >
Re: 根据业务需求选择合适的flink state
感谢你,稍后我会按这种思路试试 赵一旦 于2020年12月28日周一 下午7:12写道: > 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > > 张锴 于2020年12月28日周一 下午5:35写道: > > > 能描述一下用session window的考虑吗 > > > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > > > 这个可以用 session window 吧 > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > > > > > > > > > news_...@163.com > > > > > > > > 发件人: 张锴 > > > > 发送时间: 2020-12-28 13:35 > > > > 收件人: user-zh > > > > 主题: 根据业务需求选择合适的flink state > > > > 各位大佬帮我分析下如下需求应该怎么写 > > > > > > > > 需求说明: > > > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > > > > > 我的想法: > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > > > > > flink 版本1.10.1 > > > > > > > > > >
Re: 根据业务需求选择合适的flink state
按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 张锴 于2020年12月28日周一 下午5:35写道: > 能描述一下用session window的考虑吗 > > Akisaya 于2020年12月28日周一 下午5:00写道: > > > 这个可以用 session window 吧 > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > > > > > news_...@163.com > > > > > > 发件人: 张锴 > > > 发送时间: 2020-12-28 13:35 > > > 收件人: user-zh > > > 主题: 根据业务需求选择合适的flink state > > > 各位大佬帮我分析下如下需求应该怎么写 > > > > > > 需求说明: > > > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > > > 我的想法: > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > > > flink 版本1.10.1 > > > > > >
Re: 根据业务需求选择合适的flink state
能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: > 这个可以用 session window 吧 > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > news_...@163.com > > > > 发件人: 张锴 > > 发送时间: 2020-12-28 13:35 > > 收件人: user-zh > > 主题: 根据业务需求选择合适的flink state > > 各位大佬帮我分析下如下需求应该怎么写 > > > > 需求说明: > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > 我的想法: > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > flink 版本1.10.1 > > >
Re: 根据业务需求选择合适的flink state
能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: > 这个可以用 session window 吧 > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > news_...@163.com 于2020年12月28日周一 下午2:15写道: > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > > > > > news_...@163.com > > > > 发件人: 张锴 > > 发送时间: 2020-12-28 13:35 > > 收件人: user-zh > > 主题: 根据业务需求选择合适的flink state > > 各位大佬帮我分析下如下需求应该怎么写 > > > > 需求说明: > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > > > 我的想法: > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > > > flink 版本1.10.1 > > >
Re: 根据业务需求选择合适的flink state
这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > news_...@163.com > > 发件人: 张锴 > 发送时间: 2020-12-28 13:35 > 收件人: user-zh > 主题: 根据业务需求选择合适的flink state > 各位大佬帮我分析下如下需求应该怎么写 > > 需求说明: > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > 我的想法: > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > flink 版本1.10.1 >
Re: 根据业务需求选择合适的flink state
这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 news_...@163.com 发件人: 张锴 发送时间: 2020-12-28 13:35 收件人: user-zh 主题: 根据业务需求选择合适的flink state 各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1
根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1