Hi Timur, a TaskManager may run as many subtasks of a Map operator as it has slots. Each subtask of an operator runs in a different thread. Each parallel subtask of a Map operator has its own MapFunction object, so it should be possible to use a lazy val.
However, you should not use static variables to hold state, because these are shared between all MapFunction in a TaskManager (JVM). 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>: > Actually, a follow-up question: is map function single-threaded (within > one task manager, that is). If it's not then lazy initialization wont' > work, is it right? > > On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote: > >> You may also be able to initialize the client only in the parallel >> execution by making it a "lazy" variable in Scala. >> >> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov < >> timur.fairu...@gmail.com> wrote: >> >>> Outstanding! Thanks, Aljoscha. >>> >>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> you could use a RichMapFunction that has an open method: >>>> >>>> data.map(new RichMapFunction[...]() { >>>> def open(): () = { >>>> // initialize client >>>> } >>>> >>>> def map(input: INT): OUT = { >>>> // use client >>>> } >>>> } >>>> >>>> the open() method is called before any elements are passed to the >>>> function. The counterpart of open() is close(), which is called after all >>>> elements are through or if the job cancels. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm writing a Scala Flink application. I have a standalone process >>>>> that exists on every Flink node that I need to call to transform my data. >>>>> To access this process I need to initialize non thread-safe client first. >>>>> I >>>>> would like to avoid initializing a client for each element being >>>>> transformed. A straightforward implementation would be something like >>>>> this: >>>>> ``` >>>>> >>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >>>>> val pool = new ArrayBlockingQueue[Client](5) >>>>> // pool is filled here >>>>> data.map(e => { >>>>> val client = pool.take() >>>>> val res = client.transform(e) >>>>> pool.put(client) >>>>> res >>>>> }) >>>>> >>>>> ``` >>>>> However, this causes a runtime exception with message "Task not >>>>> serializable", which makes sense. >>>>> >>>>> Function parameters and broadcast variables won't work either as far >>>>> as I understand. Is there a way to make this happen? >>>>> >>>>> Thanks, >>>>> Timur >>>>> >>>> >>> >> >