Hello,
state is local to each parallel instance of an operator. Coupled with
the fact that the "map" method is always called by the same thread (and
never concurrently) the ValueState (or any state for that matter) will
always return the latest values.
On 10.09.2017 14:39, Federico D'Ambrosio wrote:
Hi,
as per the mail subject I wanted to ask you if a State access (read
and write) is synchronized.
I have the following stream:
val airtrafficEvents = stream
.keyBy(_.flightInfo.flight)
.map(new UpdateIdFunction())
where UpdateIdFunction is a RichMapFunction with a ValueState and a
MapState, with the following map method
def map(value: AirTrafficEvent): AirTrafficEventWithId = {
val flight = value.flightInfo.flight
val time = value.instantValues.time
AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))
}
private def createOrGetId(_key: String, _time: Long): Int = {
val tmpId = valuestate.value
//Remove from MapState entries older than one minute
val entry = Option[(Int, Long)](lookupMap.get(_key))
//update ValueState or MapState if needed
//return current updated ValueState or corresponding ID from updated
MapState
}
So, I'm using the MapState to track the integer IDs of the events of
the stream,retaining only the latest records inside the MapState, and
I'm using the ValueState to generate an incremental integer ID for
said events.
Given all of this, I'm really not sure how the mapping is applied to
the keyedstream in input: is it guaranteed that each time the method
is called I'm getting the latest and updated value/map?
Thank you for your attention,
Federico