Hi, Falak
>>>Now if I try to query this bucket  (state) using a queryable state, then
i don't get the results.
AFAIK, Flink does not have a way to let user query the state of the
`WiindowOperator`. It needs to expose the window operator's internal
implementation, which might be difficult to maintain if the implementation
changes.

>>>Similarly if I print the *minMaxTempPerWindow* stream, it is printed
only when the bucket is finalized.
This is because 'TumblingEventTimeWindows' only sends the result at the end
of the window. If you want to get the result "quickly" you could customize
the window's trigger[1] yourself.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#fire-and-purge
Best,
Guowei


On Mon, Jan 25, 2021 at 12:47 PM Falak Kansal <[email protected]>
wrote:

> Hi, greetings
>
> I am applying window operations on a datastream. Then I apply some
> transformation (it could be anything). Let's say I keep the window size to
> 1 minute and data is coming in a strictly increasing timestamp and let's
> say watermark is 1 ms (checkpointing is also enabled). There would be a one
> window, where data will be constantly coming. Now if I try to query this
> bucket  (state) using a queryable state, then i don't get the results.
> Similarly if I print the *minMaxTempPerWindow* stream, it is printed
> only when the bucket is finalized. I am not able to retrieve the results
> until that window is finalized. For all other finalized buckets i am able
> to query the results. If I keep the window size to 10 minutes that means I
> won't be able to query the data upto 10 minutes which makes it unfit for
> real time streaming use cases. I think there must be some way to query the
> intermediate state of a window. Help would be appreciated. Thank you.
>
>
> Below is the code, *state *is what i am querying later using
> *QueryableStateClient.*
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> // checkpoint every 10 seconds
> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>
> // use event time for the application
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> // configure watermark interval
> env.getConfig.setAutoWatermarkInterval(1L)
>
> // ingest sensor stream
> val sensorData: DataStream[SensorReading] = env
>   // SensorSource generates random temperature readings
>   .addSource(new ResettableSensorSource)
>   // assign timestamps and watermarks which are required for event time
>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>
> val minMaxTempPerWindow = sensorData
>   .keyBy(_.id)
>   .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>   .process(new HighAndLowTempProcessFunction)
>
> val state = 
> minMaxTempPerWindow.keyBy(_.endTs).asQueryableState("highAndLowTemperature")
>
> Thank you
> Falak
>
>

Reply via email to