Any thoughts on this? On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote:
> Hi, > > I have a very simple program using the local execution environment, that > throws NPE and other exceptions related to concurrent access when launching > a count for a DataSet from different threads. The program is > https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is > basically this: > > def doubleCollectConcurrent = { > val env = ExecutionEnvironment.createLocalEnvironment(3) > val xs = env.fromCollection(1 to 100).map{_+1} > implicit val ec = > ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) > > val pendingActions = Seq.fill(10)( > Future { println(s"xs.count = ${xs.count}") } > ) > val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) => > println("pending action finished") > Unit > } > Await.result(pendingActionsFinished, 10 seconds) > > ok > } > > > It looks like the issue is on OperatorTranslation.java at > https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51, > when a sink is added to the sinks list while that list is being traversed. > I have the impression that this is by design, so I'd like to confirm that > this is the expected behaviour, and whether this is happening only for the > local execution environment, or if this affects all execution environments > implementations. Other related questions I have are: > > - Is this documented somewhere? I'm quite new to Flink, so I might > have missed this. Is there any known workaround for concurrently launching > counts and other sink computations on the same DataSet? > - Is it safe performing a sequence of calls to DataSet sink methods > like count or collect, on the same DataSet, as long as they are performed > from the same thread? From my experience it looks like it is, but I'd like > to get a confirmation if possible. > > This might be related to > https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink > but I'm not sure. > > Thanks a lot for your help. > > Greetings, > > Juan >