Actually, a follow-up question: is map function single-threaded (within one task manager, that is). If it's not then lazy initialization wont' work, is it right?
On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote: > You may also be able to initialize the client only in the parallel > execution by making it a "lazy" variable in Scala. > > On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairu...@gmail.com > > wrote: > >> Outstanding! Thanks, Aljoscha. >> >> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> you could use a RichMapFunction that has an open method: >>> >>> data.map(new RichMapFunction[...]() { >>> def open(): () = { >>> // initialize client >>> } >>> >>> def map(input: INT): OUT = { >>> // use client >>> } >>> } >>> >>> the open() method is called before any elements are passed to the >>> function. The counterpart of open() is close(), which is called after all >>> elements are through or if the job cancels. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I'm writing a Scala Flink application. I have a standalone process that >>>> exists on every Flink node that I need to call to transform my data. To >>>> access this process I need to initialize non thread-safe client first. I >>>> would like to avoid initializing a client for each element being >>>> transformed. A straightforward implementation would be something like this: >>>> ``` >>>> >>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >>>> val pool = new ArrayBlockingQueue[Client](5) >>>> // pool is filled here >>>> data.map(e => { >>>> val client = pool.take() >>>> val res = client.transform(e) >>>> pool.put(client) >>>> res >>>> }) >>>> >>>> ``` >>>> However, this causes a runtime exception with message "Task not >>>> serializable", which makes sense. >>>> >>>> Function parameters and broadcast variables won't work either as far as >>>> I understand. Is there a way to make this happen? >>>> >>>> Thanks, >>>> Timur >>>> >>> >> >