Hi,

as per the mail subject I wanted to ask you if a State access (read and
write) is synchronized.

I have the following stream:

val airtrafficEvents = stream
.keyBy(_.flightInfo.flight)
.map(new UpdateIdFunction())


where UpdateIdFunction is a RichMapFunction with a ValueState and a
MapState, with the following map method

def map(value: AirTrafficEvent): AirTrafficEventWithId = {

  val flight = value.flightInfo.flight
  val time = value.instantValues.time

  AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))

}

private def createOrGetId(_key: String, _time: Long): Int = {

  val tmpId = valuestate.value

  //Remove from MapState entries older than one minute

  val entry = Option[(Int, Long)](lookupMap.get(_key))

  //update ValueState or MapState if needed

  //return current updated ValueState or corresponding ID from updated
MapState

}

So, I'm using the MapState to track the integer IDs of the events of the
stream, retaining only the latest records inside the MapState, and I'm
using the ValueState to generate an incremental integer ID for said events.
Given all of this, I'm really not sure how the mapping is applied to the
keyedstream in input: is it guaranteed that each time the method is called
I'm getting the latest and updated value/map?

Thank you for your attention,
Federico

Reply via email to