I've spent a few days on this and boiled this down to a minimal example. Can an
expert with Nim parallelism please tell me where I'm going wrong here?
Output changes from run to run (is stochastic) but I did not intend that and do
not see how this can be. Here are several runs in a row of a very simple thread
pool algorithm that adds 1 to each element of a seq every time a job is
submitted to the pool.
1040.0 == 1000.0 (false)
1020.0 == 1000.0 (false)
1039.0 == 1000.0 (false)
1000.0 == 1000.0 (true) <-- the intended result
1069.0 == 1000.0 (false)
1040.0 == 1000.0 (false)
995.0 == 1000.0 (false)
Run
Minimal Example
import locks
from math import sum
from strformat import `&`
from std/decls import byaddr
const NTHREADS = 8 # pool size
const NSUBMISSIONS = 100 # number of jobs to submit to pool
const NVALUES = 10 # length of reduction data seq
type
ThreadComm = enum Begin Ready Quit
ThreadParams = tuple
threadDataPtr: ptr ThreadData
resultsPtr: ptr Reduction
ThreadData = object
input, output: Channel[ThreadComm]
handle: Thread[ThreadParams]
Reduction = object
values: seq[float]
lock: Lock
proc singleThread(args: ThreadParams) =
let threadData {.byaddr.} = args.threadDataPtr[]
let results {.byaddr.} = args.resultsPtr[]
threadData.output.send(ThreadComm.Ready)
while true:
let command = threadData.input.recv() # 'Begin' or 'Quit' received
if command == ThreadComm.Quit: break
acquire results.lock
for i in 0..<results.values.len:
results.values[i] += 1.0
release results.lock
threadData.output.send(ThreadComm.Ready)
proc main =
var pool = newSeq[ThreadData](len=NTHREADS)
var reduction = Reduction(values: newSeq[float](len=NVALUES)) # target
for reduce operation
for thread in pool.mitems:
open thread.input
open thread.output
thread.handle.createThread( singleThread, param=(addr thread, addr
reduction) )
# work submission loop
var workID: Natural
while workID < NSUBMISSIONS:
for thread in pool.mitems:
let channel = thread.output.tryRecv() # no data if thread not done yet
if channel.dataAvailable: # assume state == ThreadComm.Ready if
anything received
inc workID
thread.input.send(ThreadComm.Begin)
# wrap up
for thread in pool.mitems:
thread.input.send(ThreadComm.Quit)
for thread in pool.mitems:
joinThread thread.handle
close thread.input
close thread.output
# report
const expected = 1000.0
let total = sum(reduction.values)
let TorF = total == expected
echo &"{total} == {expected} ({TorF})"
when isMainModule:
main()
Run
The goal is to use this minimal thread pool implementation, not a library. Yes,
I'm aware of the libraries and have used them all. I'd love to understand why
this doesn't work as I intend because it's rather basic. Please help me move
along to the facepalm moment :)