??????????

??????if(stateDate.equals("") || 
stateDate.equals(date))????????????????????pv_st.clear()????1.??????????????????pv_st????????????????????????????????2.state.clear()
 
????????????????????????????null????????????????????????null????????????????????????pv_st.update(pv_st.value()
 + c_st)????????????????????








------------------ ???????? ------------------
??????:&nbsp;"Yun Tang"<myas...@live.com&gt;;
????????:&nbsp;2020??3??31??(??????) ????3:59
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re:  ProcessWindowFunction??????????????state??



Hi

???????? if(stateDate.equals("") || stateDate.equals(date)) 
??????????????????????????stateDate????????????????????????????????????????????????
??????state.clear() 
????????????????????????????null??????????????????????????????????????????????

????
????
________________________________
From: ???? <346531...@qq.com&gt;
Sent: Tuesday, March 31, 2020 12:33
To: user-zh <user-zh@flink.apache.org&gt;
Subject: ProcessWindowFunction??????????????state??

????????


--????
FLINK 1.10.0 ON YARN


--????
1.????????&amp;nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))????
2.????????new 
Trigger(????.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
3.????????new 
ProcessWindowFunction(),????????????????????????????????0??????????????????????????????????????????
--????
&amp;nbsp;?? new 
ProcessWindowFunction()????????ValueState????????????0????????ValueState??????????????????????????ValueState??????????????????????????????????????????.clear()??????????????????????????????????



--????????


&nbsp; .window(TumblingProcessingTimeWindows.of(Time.days(1)))&nbsp;&nbsp; 
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, 
TimeWindow] {


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; private var pv_st: ValueState[Long] 
= _&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; override def open(parameters: 
Configuration): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;pv_st = 
getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", 
classOf[Long]))
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;override def process(key: Tuple, 
context: Context, elements: Iterable[(String,String,Long)], out: 
Collector[String]): Unit = {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var c_st = 0
&amp;nbsp; &amp;nbsp;
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val elementsIterator = 
elements.iterator
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // 
??????????????????????word
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; while 
(elementsIterator.hasNext) {
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val ac_name = 
elementsIterator.next()._2
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
if(!ac_name.isEmpty &amp;amp;&amp;amp; ac_name.equals("listentime")){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
c_st +=1
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val time: Date = new 
Date()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val dateFormat: 
SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val date = 
dateFormat.format(time)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // add current
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp;pv_st.update(pv_st.value() + c_st)


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; var jsonStr = 
""+key.getField(0)+"_"+date+"&amp;amp;" // json????????
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; jsonStr += "{"+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; 
"\"yesterday_foreground_play_pv\":\""+pv_st.value()+
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp; &amp;nbsp; "\"}";




&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
//??????????????????????????????????????????????????????????????
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if(stateDate.equals("") 
|| stateDate.equals(date)){
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }else{
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 
out.collect(jsonStr)
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pv_st.clear()
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; stateDate=date
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }


&amp;nbsp; &amp;nbsp; &amp;nbsp; })

回复