Hi Timur,

a TaskManager may run as many subtasks of a Map operator as it has slots.
Each subtask of an operator runs in a different thread. Each parallel
subtask of a Map operator has its own MapFunction object, so it should be
possible to use a lazy val.

However, you should not use static variables to hold state, because these
are shared between all MapFunction in a TaskManager (JVM).

2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>:

> Actually, a follow-up question: is map function single-threaded (within
> one task manager, that is). If it's not then lazy initialization wont'
> work, is it right?
>
> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> You may also be able to initialize the client only in the parallel
>> execution by making it a "lazy" variable in Scala.
>>
>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>> timur.fairu...@gmail.com> wrote:
>>
>>> Outstanding! Thanks, Aljoscha.
>>>
>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> you could use a RichMapFunction that has an open method:
>>>>
>>>> data.map(new RichMapFunction[...]() {
>>>>   def open(): () = {
>>>>     // initialize client
>>>>   }
>>>>
>>>>   def map(input: INT): OUT = {
>>>>     // use client
>>>>   }
>>>> }
>>>>
>>>> the open() method is called before any elements are passed to the
>>>> function. The counterpart of open() is close(), which is called after all
>>>> elements are through or if the job cancels.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm writing a Scala Flink application. I have a standalone process
>>>>> that exists on every Flink node that I need to call to transform my data.
>>>>> To access this process I need to initialize non thread-safe client first. 
>>>>> I
>>>>> would like to avoid initializing a client for each element being
>>>>> transformed. A straightforward implementation would be something like 
>>>>> this:
>>>>> ```
>>>>>
>>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>>> // pool is filled here
>>>>> data.map(e => {
>>>>>   val client = pool.take()
>>>>>   val res = client.transform(e)
>>>>>   pool.put(client)
>>>>>   res
>>>>> })
>>>>>
>>>>> ```
>>>>> However, this causes a runtime exception with message "Task not
>>>>> serializable", which makes sense.
>>>>>
>>>>> Function parameters and broadcast variables won't work either as far
>>>>> as I understand. Is there a way to make this happen?
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>>
>>>>
>>>
>>
>

Reply via email to