Hi Peter,
I'm no Scala developer but I may be able to help with some concepts:

* a static reference used inside a [Map]Function will certainly cause problems 
when executed in parallel in the same JVM, e.g. a TaskManager with multiple 
slots, depending on whether this static object is stateful and/or thread-safe
* additionally, not all parallel instances of your map may be executed in the 
same JVM, e.g. on multiple TaskManagers, so you cannot assume that the state 
of the JsonMapper is consistent among them
* if the ObjectMapper does not store any state that is worth recovering during 
a failure (none that I see from https://fasterxml.github.io/jackson-databind/
javadoc/2.3.0/com/fasterxml/jackson/databind/ObjectMapper.html if that is the 
one you are using), then you don't need to put it into flink state but can 
either initialise it as a (non-static) member of your MapFunction class or 
even in your map function itself
* for the correct use of keyed/non-keyed state, please refer to my other email 
or [1]
* for 'class' vs. 'object': if you're using 
com.fasterxml.jackson.databind.ObjectMapper as described above, you'll have 
state again ("It will use instances of JsonParser and JsonGenerator for 
implementing actual reading/writing of JSON. " from the docs) but in general, 
it is a good question whether the singleton would work for stateless operators 
and whether it actually improves performance.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Thursday, 3 August 2017 12:41:36 CEST Peter Ertl wrote:
> Hi flink users,
> 
> I just wanted to ask if this kind of scala map function is correct?
> 
> object JsonMapper {
>   private val mapper: ObjectMapper = new ObjectMapper()
> }
> 
> class JsonMapper extends MapFunction[String, ObjectNode] {
>   override def map(value: String): ObjectNode =
> JsonMapper.mapper.readValue(value, classOf[ObjectNode]) }
> 
> Is using a static reference to ObjectMapper fine or will this cause issues
> on a distributed cluster / with checkpoint / serializing state / whatever ?
> 
> Or should I instead use a non-transient property initialized in ctor
> (ObjectMapper is java.io.Serializable) ?
> 
> Or should I initialize it with RichMapFunction.open into a transient
> property?
> 
> Also I am wondering if replacing 'class' with 'object' (=> singleton)
> 
> object JsonMapper extends MapFunction[String, ObjectNode] { /* ..*/ }
> 
> is ok (actually the mapper is stateless so no obvious need to re-instantiate
> it again and again ? )
> 
> Thanks and best regards
> Peter

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to