Hi,

I have a very simple program using the local execution environment, that
throws NPE and other exceptions related to concurrent access when launching
a count for a DataSet from different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
basically this:

def doubleCollectConcurrent = {
  val env = ExecutionEnvironment.createLocalEnvironment(3)
  val xs = env.fromCollection(1 to 100).map{_+1}
  implicit val ec =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val pendingActions = Seq.fill(10)(
    Future { println(s"xs.count = ${xs.count}") }
  )
  val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
    println("pending action finished")
    Unit
  }
  Await.result(pendingActionsFinished, 10 seconds)

  ok
}


It looks like the issue is on OperatorTranslation.java at
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being traversed.
I have the impression that this is by design, so I'd like to confirm that
this is the expected behaviour, and whether this is happening only for the
local execution environment, or if this affects all execution environments
implementations. Other related questions I have are:

   - Is this documented somewhere? I'm quite new to Flink, so I might have
   missed this. Is there any known workaround for concurrently launching
   counts and other sink computations on the same DataSet?
   - Is it safe performing a sequence of calls to DataSet sink methods like
   count or collect, on the same DataSet, as long as they are performed from
   the same thread? From my experience it looks like it is, but I'd like to
   get a confirmation if possible.

This might be related to
https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.

Thanks a lot for your help.

Greetings,

Juan

Reply via email to