Hi, greetings
I am applying window operations on a datastream. Then I apply some
transformation (it could be anything). Let's say I keep the window size to
1 minute and data is coming in a strictly increasing timestamp and let's
say watermark is 1 ms (checkpointing is also enabled). There would be a one
window, where data will be constantly coming. Now if I try to query this
bucket (state) using a queryable state, then i don't get the results.
Similarly if I print the *minMaxTempPerWindow* stream, it is printed
only when the bucket is finalized. I am not able to retrieve the results
until that window is finalized. For all other finalized buckets i am able
to query the results. If I keep the window size to 10 minutes that means I
won't be able to query the data upto 10 minutes which makes it unfit for
real time streaming use cases. I think there must be some way to query the
intermediate state of a window. Help would be appreciated. Thank you.
Below is the code, *state *is what i am querying later using
*QueryableStateClient.*
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1L)
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new ResettableSensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val minMaxTempPerWindow = sensorData
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new HighAndLowTempProcessFunction)
val state =
minMaxTempPerWindow.keyBy(_.endTs).asQueryableState("highAndLowTemperature")
Thank you
Falak