Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I have a very simple program using the local execution environment, that
> throws NPE and other exceptions related to concurrent access when launching
> a count for a DataSet from different threads. The program is
> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
> basically this:
>
> def doubleCollectConcurrent = {
>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>   val xs = env.fromCollection(1 to 100).map{_+1}
>   implicit val ec = 
> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>
>   val pendingActions = Seq.fill(10)(
>     Future { println(s"xs.count = ${xs.count}") }
>   )
>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
>     println("pending action finished")
>     Unit
>   }
>   Await.result(pendingActionsFinished, 10 seconds)
>
>   ok
> }
>
>
> It looks like the issue is on OperatorTranslation.java at
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
> when a sink is added to the sinks list while that list is being traversed.
> I have the impression that this is by design, so I'd like to confirm that
> this is the expected behaviour, and whether this is happening only for the
> local execution environment, or if this affects all execution environments
> implementations. Other related questions I have are:
>
>    - Is this documented somewhere? I'm quite new to Flink, so I might
>    have missed this. Is there any known workaround for concurrently launching
>    counts and other sink computations on the same DataSet?
>    - Is it safe performing a sequence of calls to DataSet sink methods
>    like count or collect, on the same DataSet, as long as they are performed
>    from the same thread? From my experience it looks like it is, but I'd like
>    to get a confirmation if possible.
>
> This might be related to
> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
> but I'm not sure.
>
> Thanks a lot for your help.
>
> Greetings,
>
> Juan
>

Reply via email to