Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-05-02 Thread Juan Rodríguez Hortalá
Thanks for your answer Fabian.

In my opinion this is not just a possible new feature for an optimization,
but a bigger problem because the client program crashes with an exception
when concurrent counts or collects are triggered on the same data set, and
this also happens non deterministically depending on how threads are
executed. So that should be documented somewhere.

Just my two cents

Thanks,

Juan

On Mon, Apr 29, 2019 at 02:02 Fabian Hueske  wrote:

> Hi Juan,
>
> count() and collect() trigger the execution of a job.
> Since Flink does not cache intermediate results (yet), all operations from
> the sink (count()/collect()) to the sources are executed.
> So in a sense a DataSet is immutable (given that the input of the sources
> do not change) but completely recomputed for every execution.
>
> There are currently some efforts [1] on the way to improve Flink behavior
> for interactive sessions.
>
> Best, Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> [2]
> https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce9831f462737f51742a52c1@%3Cdev.flink.apache.org%3E
>
> Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi Timo,
>>
>> Thanks for your answer. I was surprised to have problems calling those
>> methods concurrently, because I though data sets were immutable. Now I
>> understand calling count or collect mutates the data set, not its contents
>> but some kind of execution plan included in the data set.
>>
>> I suggest adding a remark about this lack of thread safety to the
>> documentation. Maybe it’s already there but I haven’t seen it. I also
>> understand repeated calls to collect and count the safe data set are ok as
>> long as they are done sequentially, and not concurrently.
>>
>> Thanks,
>>
>> Juan
>>
>> On Fri, Apr 26, 2019 at 02:00 Timo Walther  wrote:
>>
>>> Hi Juan,
>>>
>>> as far as I know we do not provide any concurrency guarantees for
>>> count() or collect(). Those methods need to be used with caution anyways as
>>> the result size must not exceed a certain threshold. I will loop in Fabian
>>> who might know more about the internals of the execution?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>>>
>>> Any thoughts on this?
>>>
>>> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
 Hi,

 I have a very simple program using the local execution environment,
 that throws NPE and other exceptions related to concurrent access when
 launching a count for a DataSet from different threads. The program is
 https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which
 is basically this:

 def doubleCollectConcurrent = {
   val env = ExecutionEnvironment.createLocalEnvironment(3)
   val xs = env.fromCollection(1 to 100).map{_+1}
   implicit val ec = 
 ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

   val pendingActions = Seq.fill(10)(
 Future { println(s"xs.count = ${xs.count}") }
   )
   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
 =>
 println("pending action finished")
 Unit  }
   Await.result(pendingActionsFinished, 10 seconds)

   ok}


 It looks like the issue is on OperatorTranslation.java at
 https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
 when a sink is added to the sinks list while that list is being traversed.
 I have the impression that this is by design, so I'd like to confirm that
 this is the expected behaviour, and whether this is happening only for the
 local execution environment, or if this affects all execution environments
 implementations. Other related questions I have are:

- Is this documented somewhere? I'm quite new to Flink, so I might
have missed this. Is there any known workaround for concurrently 
 launching
counts and other sink computations on the same DataSet?
- Is it safe performing a sequence of calls to DataSet sink methods
like count or collect, on the same DataSet, as long as they are 
 performed
from the same thread? From my experience it looks like it is, but I'd 
 like
to get a confirmation if possible.

 This might be related to
 https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
 but I'm not sure.

 Thanks a lot for your help.

 Greetings,

 Juan

>>>
>>>


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-29 Thread Fabian Hueske
Hi Juan,

count() and collect() trigger the execution of a job.
Since Flink does not cache intermediate results (yet), all operations from
the sink (count()/collect()) to the sources are executed.
So in a sense a DataSet is immutable (given that the input of the sources
do not change) but completely recomputed for every execution.

There are currently some efforts [1] on the way to improve Flink behavior
for interactive sessions.

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
[2]
https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce9831f462737f51742a52c1@%3Cdev.flink.apache.org%3E

Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Timo,
>
> Thanks for your answer. I was surprised to have problems calling those
> methods concurrently, because I though data sets were immutable. Now I
> understand calling count or collect mutates the data set, not its contents
> but some kind of execution plan included in the data set.
>
> I suggest adding a remark about this lack of thread safety to the
> documentation. Maybe it’s already there but I haven’t seen it. I also
> understand repeated calls to collect and count the safe data set are ok as
> long as they are done sequentially, and not concurrently.
>
> Thanks,
>
> Juan
>
> On Fri, Apr 26, 2019 at 02:00 Timo Walther  wrote:
>
>> Hi Juan,
>>
>> as far as I know we do not provide any concurrency guarantees for count()
>> or collect(). Those methods need to be used with caution anyways as the
>> result size must not exceed a certain threshold. I will loop in Fabian who
>> might know more about the internals of the execution?
>>
>> Regards,
>> Timo
>>
>>
>> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>>
>> Any thoughts on this?
>>
>> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a very simple program using the local execution environment, that
>>> throws NPE and other exceptions related to concurrent access when launching
>>> a count for a DataSet from different threads. The program is
>>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which
>>> is basically this:
>>>
>>> def doubleCollectConcurrent = {
>>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>>   implicit val ec = 
>>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>>
>>>   val pendingActions = Seq.fill(10)(
>>> Future { println(s"xs.count = ${xs.count}") }
>>>   )
>>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
>>> =>
>>> println("pending action finished")
>>> Unit  }
>>>   Await.result(pendingActionsFinished, 10 seconds)
>>>
>>>   ok}
>>>
>>>
>>> It looks like the issue is on OperatorTranslation.java at
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>>> when a sink is added to the sinks list while that list is being traversed.
>>> I have the impression that this is by design, so I'd like to confirm that
>>> this is the expected behaviour, and whether this is happening only for the
>>> local execution environment, or if this affects all execution environments
>>> implementations. Other related questions I have are:
>>>
>>>- Is this documented somewhere? I'm quite new to Flink, so I might
>>>have missed this. Is there any known workaround for concurrently 
>>> launching
>>>counts and other sink computations on the same DataSet?
>>>- Is it safe performing a sequence of calls to DataSet sink methods
>>>like count or collect, on the same DataSet, as long as they are performed
>>>from the same thread? From my experience it looks like it is, but I'd 
>>> like
>>>to get a confirmation if possible.
>>>
>>> This might be related to
>>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>>> but I'm not sure.
>>>
>>> Thanks a lot for your help.
>>>
>>> Greetings,
>>>
>>> Juan
>>>
>>
>>


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-26 Thread Juan Rodríguez Hortalá
Hi Timo,

Thanks for your answer. I was surprised to have problems calling those
methods concurrently, because I though data sets were immutable. Now I
understand calling count or collect mutates the data set, not its contents
but some kind of execution plan included in the data set.

I suggest adding a remark about this lack of thread safety to the
documentation. Maybe it’s already there but I haven’t seen it. I also
understand repeated calls to collect and count the safe data set are ok as
long as they are done sequentially, and not concurrently.

Thanks,

Juan

On Fri, Apr 26, 2019 at 02:00 Timo Walther  wrote:

> Hi Juan,
>
> as far as I know we do not provide any concurrency guarantees for count()
> or collect(). Those methods need to be used with caution anyways as the
> result size must not exceed a certain threshold. I will loop in Fabian who
> might know more about the internals of the execution?
>
> Regards,
> Timo
>
>
> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>
> Any thoughts on this?
>
> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a very simple program using the local execution environment, that
>> throws NPE and other exceptions related to concurrent access when launching
>> a count for a DataSet from different threads. The program is
>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
>> basically this:
>>
>> def doubleCollectConcurrent = {
>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>   implicit val ec = 
>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>
>>   val pendingActions = Seq.fill(10)(
>> Future { println(s"xs.count = ${xs.count}") }
>>   )
>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
>> println("pending action finished")
>> Unit  }
>>   Await.result(pendingActionsFinished, 10 seconds)
>>
>>   ok}
>>
>>
>> It looks like the issue is on OperatorTranslation.java at
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>> when a sink is added to the sinks list while that list is being traversed.
>> I have the impression that this is by design, so I'd like to confirm that
>> this is the expected behaviour, and whether this is happening only for the
>> local execution environment, or if this affects all execution environments
>> implementations. Other related questions I have are:
>>
>>- Is this documented somewhere? I'm quite new to Flink, so I might
>>have missed this. Is there any known workaround for concurrently launching
>>counts and other sink computations on the same DataSet?
>>- Is it safe performing a sequence of calls to DataSet sink methods
>>like count or collect, on the same DataSet, as long as they are performed
>>from the same thread? From my experience it looks like it is, but I'd like
>>to get a confirmation if possible.
>>
>> This might be related to
>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>> but I'm not sure.
>>
>> Thanks a lot for your help.
>>
>> Greetings,
>>
>> Juan
>>
>
>


Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-26 Thread Timo Walther

Hi Juan,

as far as I know we do not provide any concurrency guarantees for 
count() or collect(). Those methods need to be used with caution anyways 
as the result size must not exceed a certain threshold. I will loop in 
Fabian who might know more about the internals of the execution?


Regards,
Timo


Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:

Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá 
> wrote:


Hi,

I have a very simple program using the local execution
environment, that throws NPE and other exceptions related to
concurrent access when launching a count for a DataSet from
different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e
which is basically this:

def doubleCollectConcurrent = {
   val env = ExecutionEnvironment.createLocalEnvironment(3)
   val xs = env.fromCollection(1 to100).map{_+1}
   implicit val ec = 
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

   val pendingActions =Seq.fill(10)(
 Future {println(s"xs.count = ${xs.count}") }
   )
   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
=>
 println("pending action finished")
 Unit }
   Await.result(pendingActionsFinished, 10 seconds)

   ok }


It looks like the issue is on OperatorTranslation.java at

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being
traversed. I have the impression that this is by design, so I'd
like to confirm that this is the expected behaviour, and whether
this is happening only for the local execution environment, or if
this affects all execution environments implementations. Other
related questions I have are:

  * Is this documented somewhere? I'm quite new to Flink, so I
might have missed this. Is there any known workaround for
concurrently launching counts and other sink computations on
the same DataSet?
  * Is it safe performing a sequence of calls to DataSet sink
methods like count or collect, on the same DataSet, as long as
they are performed from the same thread? From my experience it
looks like it is, but I'd like to get a confirmation if possible.

This might be related to

https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan





Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-25 Thread Juan Rodríguez Hortalá
Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I have a very simple program using the local execution environment, that
> throws NPE and other exceptions related to concurrent access when launching
> a count for a DataSet from different threads. The program is
> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
> basically this:
>
> def doubleCollectConcurrent = {
>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>   val xs = env.fromCollection(1 to 100).map{_+1}
>   implicit val ec = 
> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>
>   val pendingActions = Seq.fill(10)(
> Future { println(s"xs.count = ${xs.count}") }
>   )
>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
> println("pending action finished")
> Unit
>   }
>   Await.result(pendingActionsFinished, 10 seconds)
>
>   ok
> }
>
>
> It looks like the issue is on OperatorTranslation.java at
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
> when a sink is added to the sinks list while that list is being traversed.
> I have the impression that this is by design, so I'd like to confirm that
> this is the expected behaviour, and whether this is happening only for the
> local execution environment, or if this affects all execution environments
> implementations. Other related questions I have are:
>
>- Is this documented somewhere? I'm quite new to Flink, so I might
>have missed this. Is there any known workaround for concurrently launching
>counts and other sink computations on the same DataSet?
>- Is it safe performing a sequence of calls to DataSet sink methods
>like count or collect, on the same DataSet, as long as they are performed
>from the same thread? From my experience it looks like it is, but I'd like
>to get a confirmation if possible.
>
> This might be related to
> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
> but I'm not sure.
>
> Thanks a lot for your help.
>
> Greetings,
>
> Juan
>


Exceptions when launching counts on a Flink DataSet concurrently

2019-04-07 Thread Juan Rodríguez Hortalá
Hi,

I have a very simple program using the local execution environment, that
throws NPE and other exceptions related to concurrent access when launching
a count for a DataSet from different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
basically this:

def doubleCollectConcurrent = {
  val env = ExecutionEnvironment.createLocalEnvironment(3)
  val xs = env.fromCollection(1 to 100).map{_+1}
  implicit val ec =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val pendingActions = Seq.fill(10)(
Future { println(s"xs.count = ${xs.count}") }
  )
  val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
println("pending action finished")
Unit
  }
  Await.result(pendingActionsFinished, 10 seconds)

  ok
}


It looks like the issue is on OperatorTranslation.java at
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being traversed.
I have the impression that this is by design, so I'd like to confirm that
this is the expected behaviour, and whether this is happening only for the
local execution environment, or if this affects all execution environments
implementations. Other related questions I have are:

   - Is this documented somewhere? I'm quite new to Flink, so I might have
   missed this. Is there any known workaround for concurrently launching
   counts and other sink computations on the same DataSet?
   - Is it safe performing a sequence of calls to DataSet sink methods like
   count or collect, on the same DataSet, as long as they are performed from
   the same thread? From my experience it looks like it is, but I'd like to
   get a confirmation if possible.

This might be related to
https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan