Hi,

I want to get the last value stored in ValueState when processing element in 
Trigger.

But as the log shows that sometimes I can get the value, sometimes not.

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
classOf[Long])
var value = 1
override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, 
ctx: Trigger.TriggerContext): TriggerResult = {

  println("before update value : " + 
ctx.getPartitionedState(descriptor).value())

   ctx.getPartitionedState(descriptor).update(value)

   value += 1

   println("after update value: " + ctx.getPartitionedState(descriptor).value())

   ctx.registerProcessingTimeTimer(window.maxTimestamp)
   TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
   ctx.deleteProcessingTimeTimer(window.maxTimestamp)
 }

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
   val windowMaxTimestamp = window.maxTimestamp
   if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
ctx.registerProcessingTimeTimer(windowMaxTimestamp)
 }

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getConfig.setAutoWatermarkInterval(1000L)

   val sensorData: DataStream[SensorReading] = env
     .addSource(new SensorSource)
     .assignTimestampsAndWatermarks(new SensorTimeAssigner)

   val countsPerThirtySecs = sensorData
     .keyBy(_.id)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
     .trigger(new ProcessingTimeTrigger)
     .process(new CountFunction)

   env.execute()
 }
}

Log results:

        before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia

Reply via email to