Hi Juan,
as far as I know we do not provide any concurrency guarantees for
count() or collect(). Those methods need to be used with caution anyways
as the result size must not exceed a certain threshold. I will loop in
Fabian who might know more about the internals of the execution?
Regards,
Timo
Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
Any thoughts on this?
On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá
<juan.rodriguez.hort...@gmail.com
<mailto:juan.rodriguez.hort...@gmail.com>> wrote:
Hi,
I have a very simple program using the local execution
environment, that throws NPE and other exceptions related to
concurrent access when launching a count for a DataSet from
different threads. The program is
https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e
which is basically this:
def doubleCollectConcurrent = {
val env = ExecutionEnvironment.createLocalEnvironment(3)
val xs = env.fromCollection(1 to100).map{_+1}
implicit val ec =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val pendingActions =Seq.fill(10)(
Future {println(s"xs.count = ${xs.count}") }
)
val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2)
=>
println("pending action finished")
Unit }
Await.result(pendingActionsFinished, 10 seconds)
ok }
It looks like the issue is on OperatorTranslation.java at
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
when a sink is added to the sinks list while that list is being
traversed. I have the impression that this is by design, so I'd
like to confirm that this is the expected behaviour, and whether
this is happening only for the local execution environment, or if
this affects all execution environments implementations. Other
related questions I have are:
* Is this documented somewhere? I'm quite new to Flink, so I
might have missed this. Is there any known workaround for
concurrently launching counts and other sink computations on
the same DataSet?
* Is it safe performing a sequence of calls to DataSet sink
methods like count or collect, on the same DataSet, as long as
they are performed from the same thread? From my experience it
looks like it is, but I'd like to get a confirmation if possible.
This might be related to
https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
but I'm not sure.
Thanks a lot for your help.
Greetings,
Juan