Hi, greetings

I am applying window operations on a datastream. Then I apply some
transformation (it could be anything). Let's say I keep the window size to
1 minute and data is coming in a strictly increasing timestamp and let's
say watermark is 1 ms (checkpointing is also enabled). There would be a one
window, where data will be constantly coming. Now if I try to query this
bucket  (state) using a queryable state, then i don't get the results.
Similarly if I print the *minMaxTempPerWindow* stream, it is printed
only when the bucket is finalized. I am not able to retrieve the results
until that window is finalized. For all other finalized buckets i am able
to query the results. If I keep the window size to 10 minutes that means I
won't be able to query the data upto 10 minutes which makes it unfit for
real time streaming use cases. I think there must be some way to query the
intermediate state of a window. Help would be appreciated. Thank you.


Below is the code, *state *is what i am querying later using
*QueryableStateClient.*

val env = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
  // SensorSource generates random temperature readings
  .addSource(new ResettableSensorSource)
  // assign timestamps and watermarks which are required for event time
  .assignTimestampsAndWatermarks(new SensorTimeAssigner)

val minMaxTempPerWindow = sensorData
  .keyBy(_.id)
  .window(TumblingEventTimeWindows.of(Time.seconds(60)))
  .process(new HighAndLowTempProcessFunction)

val state = 
minMaxTempPerWindow.keyBy(_.endTs).asQueryableState("highAndLowTemperature")

Thank you
Falak

Reply via email to