> Can you share your experimental code somewhere please?

Sure, I have been thinking of how to do that, as I need some code review on the 
way to a PR. Just cleaning up the code a bit before I post it.

In cleaning up the code, I found that I like using `std/isolation` a lot as it 
makes things simple, but it seems to me there are a few warts on 
`std/channels`, as follows:

1\. It is stated to be an Unstable API which is fine since probably no one has 
used it much yet, **but it is going to be Unstable for the indefinite future 
because it uses the Unstable `atomics` library.** In researching why that is so 
after all these years, I found [RFC 
#170](https://github.com/nim-lang/RFCs/issues/170) that has seemingly been 
lurking for quite some time; with a [fairly recent plan in 
place](https://github.com/timotheecour/Nim/issues/693) to pull all we lack 
regarding threading together in the light of doing things the Arc/Orc way. I 
suppose that a new `std/threadpool` library fits into that plan in the thread 
pool category, and I think that if I am going to do it, I would look into the 
dynamic thread pool route rather than just replacing the static thread pool of 
the old library.

2\. The new channel `open`/`close` proc's don't seem to do anything useful 
other than to atomically set/unset a boolean flag, as one can send/receive no 
matter the open status; the only use I can think of for them is to be used as 
an indicator that the current sender is finished with the channel and won't be 
sending anything more after what the channel currently has buffered so one 
won't be able to receive past that point, but it seems to me that there must be 
a cleaner API. Currently, it can only be queried by the public API by the 
receiver (or sender, I suppose) trying to open a channel that is supposed to be 
open to see if it is already closed (presumably by the sender, who may be the 
creator); however, closing to check for closedness won't work because we might 
close and re-open a channel that is in process by a sender. I suppose the big 
question here is who gets to open and/or close a channel since, as they are 
copied by reference counting, there are unlimited numbers of owners. One simple 
solution would maybe be to create them as open and allow them to be closed only 
once, with the means to check whether they are open or not by changing `open` 
to an `isOpen` predicate as is it's only real function now. However, it would 
still just be an indicator to be checked as the light-weight `send`/`recv` 
proc's have no means of indicating success or failure.

Rather than duplicating code that should already be in the new channels 
library, I really would like to be able to just use it for many of the 
structures required for the new thread pool library; if one could do that, the 
actual thread pool library (excluding an experimental `parallel` macro that 
should really be in its own library that may use the new thread pool library) 
is likely only a couple of hundred LOC.

In order to establish a base line speed for the new channels library as 
compared to the old library, I used the following "quicky" test:
    
    
    # establish the speed of inter-thread communication with new channels...
    # compile with "-d:release --gc:arc --threads:on"
    
    import std/[ sugar, times, os, monotimes, channels ]
    
    type Channel[T] = channels.Channel[T]
    let cNUMLOOPS = 10000000
    
    proc testNewChannels(): int =
      var ch = newChannel[int](); discard ch.open
      var thrd: Thread[Channel[int]]
      createThread(thrd, (och: Channel[int]) =>
        (for _ in 0 ..< cNUMLOOPS: send(och, 1)), ch)
      sleep(200) # note: problem if listening before sending is started!!!
      for _ in 0 ..< cNUMLOOPS: result += ch.recv
      thrd.joinThread; discard ch.close
    
    proc main() =
      let strt = getMonoTime()
      let answr0 = testNewChannels()
      let elpsd = (getMonoTime() - strt).inMilliseconds
      echo "Testing new channesl ", answr0, " times took ", elpsd, " 
mlliseconds"
    
    when isMainModule: main()
    
    
    Run

which when run on my machine shows that the basic channel rate is over ten 
million a second as compared to a similar test for the old channels that shows 
a rate of about six million a second, **but** note that there is a problem with 
the new library requiring that I put in the `sleep(100)` (or so) that would 
seem to mean that one has to start feeding the channel before one can start 
eating from it; this seems to be true whether the channel is unbuffered 
(elements = 1) or not, at least used in this way.

The above isn't testing the thread creation rate; it is showing only the basic 
channel communication rate so I can then test that when spawning a new thread 
from the thread pool that the channel rate won't distort the measurements. For 
instance, we can test the rate of the old thread pool spawning by just 
modifying the above to:
    
    
    # establish the speed of spawning old thread pool threads...
    # compile with "-d:release --gc:arc --threads:on"
    
    import std/[ sugar, times, monotimes, channels, threadpool ]
    
    type Channel[T] = channels.Channel[T]
    let cNUMLOOPS = 100000
    
    proc testNewChannels(): int =
      var ch = newChannel[int](); discard ch.open
      for _ in 0 ..< cNUMLOOPS:
        spawn(((och: Channel[int]) => send(och, 1))(ch))
        result += ch.recv
      sync(); discard ch.close
    
    proc main() =
      let strt = getMonoTime()
      let answr0 = testNewChannels()
      let elpsd = (getMonoTime() - strt).inMilliseconds
      echo "Testing old spawning ", answr0, " times took ", elpsd, " 
mlliseconds"
    
    when isMainModule: main()
    
    
    Run

which shows that the rate of spawning for the old thread pool library is about 
133,000 per second and we know that the channel rate isn't really affecting the 
measurement because it is about sixty times faster. So now we can test the 
method that I plan to use as follows:
    
    
    # establish the speed of new spawning of thread pool threads...
    # compile with "-d:release --gc:arc --threads:on"
    
    import std/[ times, monotimes, channels ]
    from cpuinfo import countProcessors
    
    type Channel[T] = channels.Channel[T]
    let cNUMLOOPS = 100000
    let cNUMPROCS = countProcessors()
    
    # a global threadpool...
    type
      WorkProc = proc(och: Channel[int]): void {.gcsafe, nimcall.}
      WorkObj = object
        wp: WorkProc
        warg: Channel[int]
    var wrkch = newChannel[WorkObj](1)
    var thrds = newSeq[Thread[Channel[WorkObj]]](cNUMPROCS)
    proc wrkrproc(wch: Channel[WorkObj]) =
      {.gcsafe.}:
        while true:
          var wrk = wch.recv; wrk.wp(wrk.warg)
    for i in 0 ..< cNUMPROCS: createThread(thrds[i], wrkrproc, wrkch)
    proc spawn(iwp: WorkProc, och: Channel[int]) =
      send(wrkch, WorkObj(wp: iwp, warg: och))
    
    proc testNewSpawning(): int =
      var ch = newChannel[int](); discard ch.open
      proc doit(och: Channel[int]) =
        {.gcsafe.}:
          send(och, 1)
      for _ in 0 ..< cNUMLOOPS:
        spawn(doit, ch)
        result += ch.recv
      discard ch.close
    
    proc main() =
      let strt = getMonoTime()
      let answr0 = testNewSpawning()
      let elpsd = (getMonoTime() - strt).inMilliseconds
      echo "Testing new spawning ", answr0, " times took ", elpsd, " 
mlliseconds"
    
    when isMainModule: main()
    
    
    Run

which runs at about the same speed or a little faster than the old spawning. In 
order to make this work to 100000, I had to make the work queue channel 
un-buffered, which seems to indicate that the bug with new channels has to do 
with buffering, but I haven't chased that down yet. It seems the new channels 
library needs some more complete tests...

Of course, this is just a Proof Of Concept and I haven't made the routines 
generic to accept any work type, and I don't want to have to depend on running 
the thread pool as globals but plan to write it so that one can construct as 
many thread pools as required and maybe even pass them around, but that will be 
in the PR. In the interests of making it even easier to use so one doesn't have 
to create their own output results channel for spawned proc's that return 
something (or nothing/`void`), I plan to make the `spawn` automatically create 
an un-buffered new channel to be used as the old `FlowVar[T]` and see no reason 
that won't work effectively.

> And a guest blog post about your findings would really be appreciated too, of 
> course.

Again, I have a few ideas for a couple of blogs related to multi-threading. but 
I am not sure about the steps to publish a blog post; I understand one needs to 
get some sort of invitation or make some sort of application?

Reply via email to