Hi 从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。 其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。
祝好 唐云 ________________________________ From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 12:33 To: user-zh <user-zh@flink.apache.org> Subject: ProcessWindowFunction中如何有效清除state呢 各位好: --版本 FLINK 1.10.0 ON YARN --过程 1.定义一个 .window(TumblingProcessingTimeWindows.of(Time.days(1)))窗口 2.定义一个new Trigger(),.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,业务是每天0点开始算这一天的数据,第二天清空从新计算, --问题 在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢? --部分代码 .window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) .process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { private var pv_st: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) } override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { var c_st = 0 val elementsIterator = elements.iterator // 遍历窗口数据,获取唯一word while (elementsIterator.hasNext) { val ac_name = elementsIterator.next()._2 if(!ac_name.isEmpty && ac_name.equals("listentime")){ c_st +=1 } } val time: Date = new Date() val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") val date = dateFormat.format(time) // add current pv_st.update(pv_st.value() + c_st) var jsonStr = ""+key.getField(0)+"_"+date+"&" // json格式开始 jsonStr += "{"+ "\"yesterday_foreground_play_pv\":\""+pv_st.value()+ "\"}"; //判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加 if(stateDate.equals("") || stateDate.equals(date)){ stateDate=date out.collect(jsonStr) }else{ out.collect(jsonStr) pv_st.clear() stateDate=date } } })