Here seems to be a safer implementation... but I'm still not quite 
confident it's optimally written. Opinions welcome!

function subdivide(v0,v1,filter::Function,middle::Function,doer::Function)
    c = Channel(1000000)
    results = Any[doer(v0),doer(v1)]
    filter(v0,v1) || return results # nothing to subdivide

    put!(c,(results[1],results[2]))

    np = nprocs() # determine the number of processes available
    idle = Array(Bool,np)
    idle[myid()] = true
    @sync begin
        for p=1:np
            if p != myid() || np == 1
                @async begin
                    while true
                        if isready(c)
                            idle[p] = false
                            (d0,d1) = take!(c)
                            v = middle(d0,d1)
                            info("Doing task $v on process $p, queue 
$c...\n")
                            d = remotecall_fetch(p,doer,v)
                            push!(results,d)
                            filter(d0,d) && put!(c,(d0,d))
                            filter(d,d1) && put!(c,(d,d1))
                        else
                            idle[p] = true
                            if all(idle) break end
                            yield()
                        end
                    end
                end
            end
        end
    end
    results
end

On Friday, 4 September 2015 09:30:50 UTC+2, Laurent Bartholdi wrote:
>
> Hello world,
> I'm completely new to Julia, and advice/comments are welcome. I'd like to 
> run calculations (~ 1 second per job) distributed along the different cores 
> of my CPU. Each calculation may trigger new calculations.
> More specifically, each calculation works on a number in an interval, and 
> may trigger a subdivision of the interval.
>
> I thought about the following implementation:
>
> function 
> subdivide(v0::Number,v1::Number,filter::Function,middle::Function,doer::Function)
>     c = Channel(1000)
>     results = Any[doer(v0),doer(v1)]
>     put!(c,(results[1],results[2]))
>     np = nprocs() # determine the number of processes available
>     @sync begin
>         for p=1:np
>             if p != myid() || np == 1
>        @async begin
>    for (d0,d1)=c
>                         v = middle(d0,d1)
>        #println("Doing $v on process $p")
>                         d = remotecall_fetch(p,doer,v)
>                         push!(results,d)
>                         filter(d0,d) && put!(c,(d0,d))
>                         filter(d,d1) && put!(c,(d,d1))
> isready(c) || break
>            end
>                 end
>             end
> end
>     end
>     results
> end
>
> typical use:
>
> julia> 
> subdivide(0,10,(x,y)->x[1]<y[1]-0.7,(x,y)->(x[1]+y[1])/2,v->[v,string(v)])
> 17-element Array{Any,1}:
>  Any[0,"0"]
>  Any[10,"10"]
>  ⋮
>  Any[8.125,"8.125"]
>  Any[9.375,"9.375"]
>
> As you see, the function "subdivide" receives a "doer" which computes a 
> bit of data, and a "filter" which decides if a middle point is required 
> between data points. "middle" computes the argument of a middle point.
>
> My worry is: certainly there will be race conditions when a calculation 
> has been removed from the channel, but the new calculations will not yet 
> have been put in; then one of the cores will stop prematurely. The last 
> "isready(c)" could also, potentially, return true but the upcoming "for 
> (d0,d1)=c" will block because meanwhile a calculation has been removed by 
> another (green) thread.
>
> How should I program this safely? It seems like a useful primitive, so 
> maybe it already exists...
>
> Thanks in advance, Laurent
>

Reply via email to