Outstanding! Thanks, Aljoscha. On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > you could use a RichMapFunction that has an open method: > > data.map(new RichMapFunction[...]() { > def open(): () = { > // initialize client > } > > def map(input: INT): OUT = { > // use client > } > } > > the open() method is called before any elements are passed to the > function. The counterpart of open() is close(), which is called after all > elements are through or if the job cancels. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Hello, >> >> I'm writing a Scala Flink application. I have a standalone process that >> exists on every Flink node that I need to call to transform my data. To >> access this process I need to initialize non thread-safe client first. I >> would like to avoid initializing a client for each element being >> transformed. A straightforward implementation would be something like this: >> ``` >> >> val env = ExecutionEnvironment.getExecutionEnvironment >> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >> val pool = new ArrayBlockingQueue[Client](5) >> // pool is filled here >> data.map(e => { >> val client = pool.take() >> val res = client.transform(e) >> pool.put(client) >> res >> }) >> >> ``` >> However, this causes a runtime exception with message "Task not >> serializable", which makes sense. >> >> Function parameters and broadcast variables won't work either as far as I >> understand. Is there a way to make this happen? >> >> Thanks, >> Timur >> >