对scala不是很熟悉,看你写的好复杂,下面是伪代码。 Ele{ id1; id2; ts; //timestamp minTs = ts; maxTs = ts; }
keyBy(id1,id2).window(EventTimeSessionWindows.withGap(Time.minutes(1))).reduce(xxxReduce, xxxProcessWindowFunc). xxxReduce(Ele ele1, Ele ele2){ return new Ele(ele1.id1, ele1.id2, null, min(ele1.minTs, ele2.minTs), max(ele1.maxTs, ele2.maxTs) ) } xxxProcessWindowFunc(Iterable<Ele> eles){ Ele ele = eles.iterator().next() durationMs = ele.maxTs - ele.minTs; collectOut(new XX(ele.id1, ele.id2, ele.minTs, ele.maxTs, duration)) // -->这代表了id1,id2的一次出现,从minTs开始出现,到maxTs结束,duration为xxx。 } 张锴 <zk357794...@gmail.com> 于2021年1月23日周六 下午5:35写道: > *我按照 session > > window分组,定义了最大最小状态,使用reduce+processWindowFunction方式重新跑了一下,出来的数据duration(秒),duration_time(时分秒)都是0。下面是我的程序,帮忙分析一下是哪里定义的有问题,为什么会出现这种情况,这个困扰了我好半天。* > > case class CloudLiveLogOnLine( > id: Long, > courseId: Long, > customerId: Long, > courseNumber: Long, > nickName: String, > partnerId: Long, > ip: String, > reportTime: String, > liveType: Int, > uid: String, > eventTime: Long > ) > > case class MinMaxTemp( > id: Long, > courseId: Long, > customerId: Long, > courseNumber: Long, > nickName: String, > partnerId: Long, > ip: String, > reportTime: String, > liveType: Int, > uid: String, > mineventTime: Long, > maxeventTime: Long > ) > > object OnLineFlinkTask { > > def main(args: Array[String]): Unit = { > > 配置省略。。。。 > > val dataStream: DataStream[CloudLiveLogOnLine] = stream.map(line => { > var id = 0L > var courseId = 0L > var courseNumber = 0L > var customerId = 0L > var nickName = "" > var partnerId = 0L > var ip = "" > var reportTime = "" > var liveType = 0 > var uid = "" > var eventTime = 0L > try { > val messageJson = JSON.parseObject(line) > val data: JSONObject = messageJson.getJSONObject("data") > id = data.getLong("id") > courseId = data.getLongValue("courseId") > courseNumber = data.getLongValue("courseNumber") > customerId = data.getLongValue("customerId") > nickName = data.getString("nickName") > partnerId = data.getLongValue("partnerId") > ip = data.getString("ip") > reportTime = data.getString("reportTime") > liveType = data.getIntValue("liveType") > uid = data.getString("uid") > eventTime = messageJson.getLongValue("eventTime") > } catch { > case e => println(line) > } > CloudLiveLogOnLine(id, courseId, customerId, courseNumber, > nickName, partnerId, ip, reportTime, liveType, uid, eventTime) > }).assignAscendingTimestamps(_.eventTime) > > // 3. transform 处理数据 > val ds = dataStream > .filter(_.liveType == 1) > .map(r=>MinMaxTemp(r.id > ,r.courseId,r.customerId,r.courseNumber,r.nickName,r.partnerId, > r.ip,r.reportTime,r.liveType,r.uid,r.eventTime,r.eventTime)) > .keyBy(1, 2) > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > .reduce(new myReduceFunc(),new AssignWindowProcessFunc()) > > ds.print() > env.execute("flink job") > > } > > } > > class myReduceFunc() extends ReduceFunction[MinMaxTemp]{ > override def reduce(value1: MinMaxTemp, value2: MinMaxTemp): MinMaxTemp > = { > MinMaxTemp(value1.id > ,value1.courseId,value1.customerId,value1.courseNumber,value1.nickName, > > value1.partnerId,value1.ip,value1.reportTime,value1.liveType,value1.uid,value1.mineventTime.min(value2.mineventTime), > > value1.maxeventTime.max(value2.maxeventTime)) > > } > } > > class AssignWindowProcessFunc() extends > ProcessWindowFunction[MinMaxTemp,CloudliveWatcher,Tuple,TimeWindow]{ > > private var minTsState: ValueState[Long] = _ > private var maxTsState: ValueState[Long] = _ > > override def open(parameters: Configuration): Unit = { > minTsState =getRuntimeContext.getState(new > ValueStateDescriptor[Long]("min-state",classOf[Long])) > maxTsState =getRuntimeContext.getState(new > ValueStateDescriptor[Long]("max-state",classOf[Long])) > } > > override def process(key: Tuple, context: Context, elements: > Iterable[MinMaxTemp], out: Collector[CloudliveWatcher]): Unit = { > val minTs: Long = minTsState.value() //取出上一个时间戳最小值 > val maxTs: Long = maxTsState.value() //取出上一个时间戳最大值 > > val device_type = 0 > val net_opretor = "" > val net_type = "" > val area = "" > val plat_form = "" > val network_operator = "" > val role = 0 > val useragent = "" > val currentDate = DateUtil.currentDate > val created_time = currentDate > val modified_time = currentDate > var id =0L > var courseId =0L > var partnerId =0L > var ip ="" > var customerId =0L > var courseNumber =0L > var nickName ="" > var liveType =0 > var uid ="" > var eventTime =0L > var min =0L > var max =0L > var join_time ="" > var leave_time ="" > var duration =0L > var duration_time ="" > val iterator: Iterator[MinMaxTemp] = elements.iterator > if (iterator.hasNext) { > val value: MinMaxTemp = iterator.next() > id = value.id > courseId= value.courseId > partnerId = value.partnerId > ip = value.ip > customerId = value.customerId > courseNumber = value.courseNumber > nickName = value.nickName > liveType = value.liveType > uid = value.uid > minTsState.update(value.mineventTime) //更新最小时间戳 > maxTsState.update(value.maxeventTime) //更新最大时间戳 > } > join_time = DateUtil.convertTimeStamp2DateStr(minTs, > DateUtil.SECOND_DATE_FORMAT) > leave_time = DateUtil.convertTimeStamp2DateStr(maxTs, > DateUtil.SECOND_DATE_FORMAT) > duration = (maxTs - minTs) / 1000 //停留多少秒 > duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > minTsState.clear() > maxTsState.clear() > > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime)) > > CloudliveWatcher(id, partnerId, courseId, customerId, > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > join_time, leave_time, created_time, modified_time > , liveType, plat_form, duration, duration_time, > network_operator, role, useragent, uid, eventTime) > > } > } > > > > > > 赵一旦 <hinobl...@gmail.com> 于2021年1月21日周四 下午8:38写道: > > > 我表达的方法是按照session > > window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。 > > > > 实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。 > > > > > > > 然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。 > > > > 赵一旦 <hinobl...@gmail.com> 于2021年1月21日周四 下午8:28写道: > > > > > 我其实没看懂你逻辑。这个和窗口的最大最小时间戳啥关系。 > > > > > > 张锴 <zk357794...@gmail.com> 于2021年1月21日周四 下午6:25写道: > > > > > >> > 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 > > >> > > >> > > > context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 > > >> 下面是我的部分代码逻辑: > > >> > > >> val ds = dataStream > > >> .filter(_.liveType == 1) > > >> .keyBy(1, 2) > > >> .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > > >> .process(new myProcessWindow()).uid("process-id") > > >> > > >> class myProcessWindow() extends > > >> ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > > >> TimeWindow] { > > >> > > >> override def process(key: Tuple, context: Context, elements: > > >> Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > > >> = { > > >> var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 > > >> var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 > > >> > > >> val currentDate = DateUtil.currentDate > > >> val created_time = currentDate > > >> val modified_time = currentDate > > >> 。。。 > > >> > > >> val join_time: String = > > >> DateUtil.convertTimeStamp2DateStr(startTime, > > >> DateUtil.SECOND_DATE_FORMAT) > > >> val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > >> DateUtil.SECOND_DATE_FORMAT) > > >> val duration = (endTime - startTime) / 1000 //停留多少秒 > > >> val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 > > >> out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > >> join_time, leave_time, created_time, modified_time > > >> , liveType, plat_form, duration, duration_time, > > >> network_operator, role, useragent, uid, eventTime)) > > >> > > >> CloudliveWatcher(id, partnerId, courseId, customerId, > > >> courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > >> join_time, leave_time, created_time, modified_time > > >> , liveType, plat_form, duration, duration_time, > > >> network_operator, role, useragent, uid, eventTime) > > >> > > >> } > > >> > > >> > > >> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? > > >> > > >> > > >> > > >> > > >> 赵一旦 <hinobl...@gmail.com> 于2020年12月28日周一 下午7:12写道: > > >> > > >> > 按直播间ID和用户ID分组,使用session > > >> window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 > > >> > > > >> > 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 > > >> > > > >> > session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 > > >> > > > >> > > > >> > 张锴 <zk357794...@gmail.com> 于2020年12月28日周一 下午5:35写道: > > >> > > > >> > > 能描述一下用session window的考虑吗 > > >> > > > > >> > > Akisaya <akikevins...@gmail.com> 于2020年12月28日周一 下午5:00写道: > > >> > > > > >> > > > 这个可以用 session window 吧 > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > >> > > > > > >> > > > news_...@163.com <news_...@163.com> 于2020年12月28日周一 下午2:15写道: > > >> > > > > > >> > > > > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > news_...@163.com > > >> > > > > > > >> > > > > 发件人: 张锴 > > >> > > > > 发送时间: 2020-12-28 13:35 > > >> > > > > 收件人: user-zh > > >> > > > > 主题: 根据业务需求选择合适的flink state > > >> > > > > 各位大佬帮我分析下如下需求应该怎么写 > > >> > > > > > > >> > > > > 需求说明: > > >> > > > > > > >> > > > > >> > > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > >> > > > > > > >> > > > > 我的想法: > > >> > > > > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event > > >> Time中的分钟数 > > >> > > > > > > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > >> > > > > > > >> > > > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > >> > > > > > > >> > > > > flink 版本1.10.1 > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > >