??????????
??????if(stateDate.equals("") || stateDate.equals(date))????????????????????pv_st.clear()????1.??????????????????pv_st????????????????????????????????2.state.clear() ????????????????????????????null????????????????????????null????????????????????????pv_st.update(pv_st.value() + c_st)???????????????????? ------------------ ???????? ------------------ ??????: "Yun Tang"<myas...@live.com>; ????????: 2020??3??31??(??????) ????3:59 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ProcessWindowFunction??????????????state?? Hi ???????? if(stateDate.equals("") || stateDate.equals(date)) ??????????????????????????stateDate???????????????????????????????????????????????? ??????state.clear() ????????????????????????????null?????????????????????????????????????????????? ???? ???? ________________________________ From: ???? <346531...@qq.com> Sent: Tuesday, March 31, 2020 12:33 To: user-zh <user-zh@flink.apache.org> Subject: ProcessWindowFunction??????????????state?? ???????? --???? FLINK 1.10.0 ON YARN --???? 1.????????&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))???? 2.????????new Trigger(????.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) 3.????????new ProcessWindowFunction(),????????????????????????????????0?????????????????????????????????????????? --???? &nbsp;?? new ProcessWindowFunction()????????ValueState????????????0????????ValueState??????????????????????????ValueState??????????????????????????????????????????.clear()?????????????????????????????????? --???????? .window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10))) .process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] { &nbsp; &nbsp; &nbsp; &nbsp; private var pv_st: ValueState[Long] = _&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; override def open(parameters: Configuration): Unit = { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long])) &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp; &nbsp;override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var c_st = 0 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val elementsIterator = elements.iterator &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // ??????????????????????word &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (elementsIterator.hasNext) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val ac_name = elementsIterator.next()._2 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(!ac_name.isEmpty &amp;&amp; ac_name.equals("listentime")){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c_st +=1 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val time: Date = new Date() &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; val date = dateFormat.format(time) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // add current &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;pv_st.update(pv_st.value() + c_st) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var jsonStr = ""+key.getField(0)+"_"+date+"&amp;" // json???????? &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jsonStr += "{"+ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "\"yesterday_foreground_play_pv\":\""+pv_st.value()+ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "\"}"; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //?????????????????????????????????????????????????????????????? &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(stateDate.equals("") || stateDate.equals(date)){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateDate=date &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out.collect(jsonStr) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }else{ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out.collect(jsonStr) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pv_st.clear() &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateDate=date &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; &nbsp; })