Hi Juan, The state will be purged if you return None instead of a Some. However, this only happens when the function is called for a specific key, i.e., state won't be automatically removed after some time. If this is your use case, you have to implement a ProcessFunction and use timers to manually clean up the state.
Best, Fabian 2018-08-08 19:02 GMT+02:00 Juan Gentile <j.gent...@criteo.com>: > Hello, > > > I'm looking at the following page of the documentation > > https://ci.apache.org/projects/flink/flink-docs- > stable/dev/stream/state/state.html > > particularly at this piece of code: > > > val stream: DataStream[(String, Int)] = ... > val counts: DataStream[(String, Int)] = stream > .keyBy(_._1) > .mapWithState((in: (String, Int), count: Option[Int]) => > count match { > case Some(c) => ( (in._1, c), Some(c + in._2) ) > case None => ( (in._1, 0), Some(in._2) ) > }) > > > How is the state clear/purge in this case for keys that no longer appear? > > > Thank you, > > Juan >