Hi Juan,

as far as I know we do not provide any concurrency guarantees for count() or collect(). Those methods need to be used with caution anyways as the result size must not exceed a certain threshold. I will loop in Fabian who might know more about the internals of the execution?

Regards,
Timo


Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com <mailto:juan.rodriguez.hort...@gmail.com>> wrote:

    Hi,

    I have a very simple program using the local execution
    environment, that throws NPE and other exceptions related to
    concurrent access when launching a count for a DataSet from
    different threads. The program is
    https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e
    which is basically this:

    def doubleCollectConcurrent = {
       val env = ExecutionEnvironment.createLocalEnvironment(3)
       val xs = env.fromCollection(1 to100).map{_+1}
       implicit val ec = 
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

       val pendingActions =Seq.fill(10)(
         Future {println(s"xs.count = ${xs.count}") }
       )
       val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
=>
         println("pending action finished")
         Unit }
       Await.result(pendingActionsFinished, 10 seconds)

       ok }


    It looks like the issue is on OperatorTranslation.java at
    
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
    when a sink is added to the sinks list while that list is being
    traversed. I have the impression that this is by design, so I'd
    like to confirm that this is the expected behaviour, and whether
    this is happening only for the local execution environment, or if
    this affects all execution environments implementations. Other
    related questions I have are:

      * Is this documented somewhere? I'm quite new to Flink, so I
        might have missed this. Is there any known workaround for
        concurrently launching counts and other sink computations on
        the same DataSet?
      * Is it safe performing a sequence of calls to DataSet sink
        methods like count or collect, on the same DataSet, as long as
        they are performed from the same thread? From my experience it
        looks like it is, but I'd like to get a confirmation if possible.

    This might be related to
    
https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
    but I'm not sure.

    Thanks a lot for your help.

    Greetings,

    Juan


Reply via email to