Hi, I want to get the last value stored in ValueState when processing element in Trigger.
But as the log shows that sometimes I can get the value, sometimes not. Only one key in my data(SensorReading). ValueState: class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long]) var value = 1 override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { println("before update value : " + ctx.getPartitionedState(descriptor).value()) ctx.getPartitionedState(descriptor).update(value) value += 1 println("after update value: " + ctx.getPartitionedState(descriptor).value()) ctx.registerProcessingTimeTimer(window.maxTimestamp) TriggerResult.CONTINUE } override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { ctx.deleteProcessingTimeTimer(window.maxTimestamp) } override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = { val windowMaxTimestamp = window.maxTimestamp if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp) } override def canMerge: Boolean = true } Main process: object MyCustomWindows { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointInterval(10 * 1000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(1000L) val sensorData: DataStream[SensorReading] = env .addSource(new SensorSource) .assignTimestampsAndWatermarks(new SensorTimeAssigner) val countsPerThirtySecs = sensorData .keyBy(_.id) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) .trigger(new ProcessingTimeTrigger) .process(new CountFunction) env.execute() } } Log results: before update value : null after update value: 1 before update value : null after update value: 2 before update value : null after update value: 3 before update value : 3 after update value: 4 before update value : null after update value: 5 before update value : null after update value: 6 before update value : null after update value: 7 before update value : null after update value: 8 before update value : null after update value: 9 before update value : 9 after update value: 10 Best regards Utopia