Re: Fiber and Thread Communication
On 04/09/2016 07:45 AM, Nordlöw wrote: > On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote: > AFAICT, it is not clear what are the limitations of the current > std.concurrency and, from what. An illustrating example on task-based > parallellism (such as the ones in jin.go) should partly alleviate this > problem. I don't know how much this helps but I was able to write an example that seems to work (elsewhere in this thread): http://forum.dlang.org/post/nec62k$26v7$1...@digitalmars.com Ali
Re: Fiber and Thread Communication
On 04/08/2016 02:42 PM, Dicebot wrote: >> Thanks Dicebot. I don't think the included >> std.concurrency.FiberScheduler has support for message passing because >> FiberScheduler.spawn does not return a Tid. If so, I don't see how >> it's possible to send messages between fibers. >> >> Ali > > Looks like a (funny) oversight. Sorry, I misled you. :) > Note that you get it for get fiber via > https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1337 > (and FiberScheduler specifically extends Fiber to add ThreadInfo to it) > but there is no clear way to pass that info to spawn host. I have a > feeling that if that code is patched to simply provide Tid, message > passing will just magically work. Needs to be checked though. It turns out, instead of calling scheduler.spawn() directly, the program sets the __gshared 'scheduler' variable first and then calls spawn() as usual, which does return that fiber's Tid: import std.stdio; import std.concurrency; import std.range; import std.algorithm; struct Done { } void workerTask(int id) { writefln("workerTask %s started", id); bool done = false; while (!done) { receive( (int message) { writefln("workerTask %s received %s", id, message); ownerTid.send(message * id); }, (Done message) { writefln("workerTask %s received Done", id); done = true; }); // Seems not to be needed: // scheduler.yield(); } writefln("workerTask %s exiting", id); } void mainTask() { enum workerCount = 5; enum loopCount = 3; writeln("mainTask started"); auto workers = iota(workerCount) .map!(id => spawn(&workerTask, id)) .array; foreach (i; 0 .. loopCount) { foreach (id, worker; workers) { worker.send(i); auto response = receiveOnly!int(); assert(response == i * id); writefln("mainTask received %s", response); } } writeln("mainTask sending Done messages"); foreach (worker; workers) { worker.send(Done()); } writeln("mainTask exiting"); } void main() { scheduler = new FiberScheduler; scheduler.start({ mainTask(); }); } Ali
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote: Are there any plans to unite AFAICT, it is not clear what are the limitations of the current std.concurrency and, from what. An illustrating example on task-based parallellism (such as the ones in jin.go) should partly alleviate this problem. Any ideas on what such an example should contain and illustrate. Current limitations and plans on fixing should be described aswell. References: http://forum.dlang.org/post/mailman.776.1459177268.26339.digitalmar...@puremagic.com http://code.dlang.org/packages/jin-go https://github.com/nin-jin/go.d Further, who's up for the job of add the missing parts in std.concurrency using ideas and code from https://github.com/nin-jin/go.d ? :)
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 20:25:11 UTC, Ali Çehreli wrote: On 04/08/2016 01:16 PM, Dicebot wrote: On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote: On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote: On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? AFAIR, yes (I haven't used std.concurrency in a long while, telling all from memory only). yes what? Thread or Fiber. Yes both :) Tid represent abstract execution context, with no implications about underlying executor. Thanks Dicebot. I don't think the included std.concurrency.FiberScheduler has support for message passing because FiberScheduler.spawn does not return a Tid. If so, I don't see how it's possible to send messages between fibers. Ali Looks like a (funny) oversight. Note that you get it for get fiber via https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1337 (and FiberScheduler specifically extends Fiber to add ThreadInfo to it) but there is no clear way to pass that info to spawn host. I have a feeling that if that code is patched to simply provide Tid, message passing will just magically work. Needs to be checked though.
Re: Fiber and Thread Communication
On 04/08/2016 01:16 PM, Dicebot wrote: On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote: On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote: On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? AFAIR, yes (I haven't used std.concurrency in a long while, telling all from memory only). yes what? Thread or Fiber. Yes both :) Tid represent abstract execution context, with no implications about underlying executor. Thanks Dicebot. I don't think the included std.concurrency.FiberScheduler has support for message passing because FiberScheduler.spawn does not return a Tid. If so, I don't see how it's possible to send messages between fibers. Ali
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 19:46:17 UTC, tcak wrote: On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote: On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? AFAIR, yes (I haven't used std.concurrency in a long while, telling all from memory only). yes what? Thread or Fiber. Yes both :) Tid represent abstract execution context, with no implications about underlying executor.
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 15:33:46 UTC, Dicebot wrote: On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? AFAIR, yes (I haven't used std.concurrency in a long while, telling all from memory only). yes what? Thread or Fiber. --- Anyway. Since, Fiber is not like a thread, and when a thread starts a Fiber, it is like calling a normal function, I guess TId represents the thread still.
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? It represents a "logical thread", which currently consists of coroutines or OS threads but could theoretically be extended to, say, other processes or even other machines.
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 14:08:39 UTC, Nordlöw wrote: So a TId can represent either a thread or a fiber? AFAIR, yes (I haven't used std.concurrency in a long while, telling all from memory only).
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 13:15:07 UTC, Dicebot wrote: On Friday, 8 April 2016 at 11:18:11 UTC, Nordlöw wrote: On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote: Doesn't std.concurrency support both right now? I remember seeing PR that adds message box support to fibers ages ago. See https://issues.dlang.org/show_bug.cgi?id=12090 and https://github.com/D-Programming-Language/phobos/pull/1910 for relevant code (as you can see it was merged several releases ago) 1. What functions provide message box communication? The same ones as thread ones. API is completely transparent. 2. But Fibers cannot currently be moved between threads right? Yes, and this is by design. It harms performance of concurrent apps. So a TId can represent either a thread or a fiber?
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 11:18:11 UTC, Nordlöw wrote: On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote: Doesn't std.concurrency support both right now? I remember seeing PR that adds message box support to fibers ages ago. See https://issues.dlang.org/show_bug.cgi?id=12090 and https://github.com/D-Programming-Language/phobos/pull/1910 for relevant code (as you can see it was merged several releases ago) 1. What functions provide message box communication? The same ones as thread ones. API is completely transparent. 2. But Fibers cannot currently be moved between threads right? Yes, and this is by design. It harms performance of concurrent apps.
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote: Doesn't std.concurrency support both right now? I remember seeing PR that adds message box support to fibers ages ago. What progress has been since post: http://forum.dlang.org/post/k4jsef$26h6$1...@digitalmars.com
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 11:01:21 UTC, Dicebot wrote: Doesn't std.concurrency support both right now? I remember seeing PR that adds message box support to fibers ages ago. 1. What functions provide message box communication? 2. But Fibers cannot currently be moved between threads right?
Re: Fiber and Thread Communication
On Friday, 8 April 2016 at 10:51:49 UTC, Nordlöw wrote: Are there any plans to unite fiber-to-fiber communication with thread-to-thread communication in Phobos? Does vibe.d give any solutions here? Doesn't std.concurrency support both right now? I remember seeing PR that adds message box support to fibers ages ago.
Fiber and Thread Communication
Are there any plans to unite fiber-to-fiber communication with thread-to-thread communication in Phobos? Does vibe.d give any solutions here?
Re: Thread communication
On Thursday, 6 August 2015 at 08:40:58 UTC, Kagamin wrote: } AFAIK, boost does it by integrating support for interruption into various functions, so IO, waits and locks reply to interrupt requests appropriately. You can do something similar. I understand the philosophy behind D-threads. However, I have a situation where waiting for a thread to react to an abort signal (if it reacts at all) and finish according to a protocol can cause a delay that may not be acceptable to the user or cause inconsistencies. Instant abortion works best with data sharing. However, then I have the ugly situation where I have to place the abort flag at strategical places in several functions/blocks to make sure the task will not be pursued, because you never know when exactly the new input will arrive. In this way it can be intercepted. Unfortunately, this is messy and it is not easy to avoid data races. A possible solution would be to halt all threads except for the main thread, spawn a new thread, and end the old thread silently behind the scenes. I'm not sure, if this is possible though. I also wonder, if it would be possible to use some sort of observer that never sleeps.
Re: Thread communication
On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote: foreach (ref i; 0..10) { writefln("%d.\tDoing something with input %s", i+1, input); Thread.sleep(500.msecs); } AFAIK, boost does it by integrating support for interruption into various functions, so IO, waits and locks reply to interrupt requests appropriately. You can do something similar.
Re: Thread communication
On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote: It was a conscious decision not to provide a kill method for threads, because it is impossible to guarantee that your program is still consistent afterwards. What about the situation where we want to kill worker threads off when closing a program? For example, I have a program with a thread that does some heavy computation in the background. When the application is closed, I want it to abort that computation, however I can't just slap a receiveTimeout in the worker thread because it is doing its work in a parallel foreach loop.
Re: Thread communication
On Wednesday, 5 August 2015 at 14:34:42 UTC, Alex Parrill wrote: On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote: Maybe we can lift this restriction if we know that the thread's main function is pure and takes no references to mutable data, because then it can by definition never mess up the program's state. That'd be a pretty useless thread; how would it communicate results back to the main thread (or wherever it should go)? It could return something. `std.concurrency.Tid` would have to be extended with a `join()` method that returns its result. Or we could somehow allow sending and receiving data.
Re: Thread communication
On Wednesday, 5 August 2015 at 11:23:28 UTC, Chris wrote: The problem is that it works up to a certain extent with receiveTimeout. However, if the input arrives in very short intervals, all the solutions I've come up with so far (including data sharing) fail sooner or later. New threads are spawned faster than old ones can be given the abort signal. There are ways to wait, till a given thread dies, say with a shared variable isAlive `while (isAlive) {}`, but even here I've come across problems when the input comes very fast. You could use a thread pool, thereby limiting the number of threads that can run at any one time. But I guess you want the processing of new data to start as soon as possible, in which case that wouldn't help you. I don't know how to solve this problem, because message passing follows a linear protocol (as far as I understand it) and shared variables give rise to data races. Something like pthread_kill() would indeed be useful, to terminate a thread at random. I wonder if fibers would be an option. D-threads seem to be based on the assumption that there is no need to abort threads at random, any time. Or am I mistaken? It was a conscious decision not to provide a kill method for threads, because it is impossible to guarantee that your program is still consistent afterwards. Maybe we can lift this restriction if we know that the thread's main function is pure and takes no references to mutable data, because then it can by definition never mess up the program's state. OTOH, the GC might be running at the time the thread is killed, which could again lead to inconsistencies...
Re: Thread communication
On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote: Maybe we can lift this restriction if we know that the thread's main function is pure and takes no references to mutable data, because then it can by definition never mess up the program's state. That'd be a pretty useless thread; how would it communicate results back to the main thread (or wherever it should go)?
Re: Thread communication
On Tuesday, 4 August 2015 at 18:15:08 UTC, Ali Çehreli wrote: On 08/04/2015 09:19 AM, Dicebot wrote: receiveTimeout I think the problem here is that the worker is busy, not even able to call that. This sounds like sending a signal to the specific thread (with pthread_kill()) but I don't know the details of it nor whether Phobos supports it. Ali The problem is that it works up to a certain extent with receiveTimeout. However, if the input arrives in very short intervals, all the solutions I've come up with so far (including data sharing) fail sooner or later. New threads are spawned faster than old ones can be given the abort signal. There are ways to wait, till a given thread dies, say with a shared variable isAlive `while (isAlive) {}`, but even here I've come across problems when the input comes very fast. I don't know how to solve this problem, because message passing follows a linear protocol (as far as I understand it) and shared variables give rise to data races. Something like pthread_kill() would indeed be useful, to terminate a thread at random. I wonder if fibers would be an option. D-threads seem to be based on the assumption that there is no need to abort threads at random, any time. Or am I mistaken?
Re: Thread communication
On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote: I want to stop (and abort) the worker as soon as new input arrives. However, while executing the function that contains the foreach-loop the worker thread doesn't listen, because it's busy, of course. I think this is a matter of architecture. If you want to use message-passing and you want the worker to react quickly to new events, this means it needs to check for new messages (via receiveTimeout) often enough, there's no way around it.
Re: Thread communication
On 08/04/2015 09:19 AM, Dicebot wrote: receiveTimeout I think the problem here is that the worker is busy, not even able to call that. This sounds like sending a signal to the specific thread (with pthread_kill()) but I don't know the details of it nor whether Phobos supports it. Ali
Re: Thread communication
receiveTimeout
Thread communication
Is there a good way to stop work-intensive threads via thread communication (instead of using a shared variable)? The example below is very basic and naive and only meant to exemplify the basic problem. I want to stop (and abort) the worker as soon as new input arrives. However, while executing the function that contains the foreach-loop the worker thread doesn't listen, because it's busy, of course. I've tried a few solutions with send and receive in this block, but somehow none of them work perfectly. //=== import std.stdio : readln, writefln, writeln; import std.string : strip; import std.concurrency; import core.thread; Tid thread1; struct Exit {} void main() { string input; bool exists; while ((input = readln.strip) != null) { if (exists) { thread1.send(Exit()); } thread1 = spawn(&worker); exists = true; thread1.send(input.idup); } } void worker() { bool run = true; while (run) { receive( (string input) { foreach (ref i; 0..10) { writefln("%d.\tDoing something with input %s", i+1, input); Thread.sleep(500.msecs); } run = false; }, (Exit exit) { run = false; } ); } writeln("End of thread worker"); } //===
Re: std.concurrency thread communication problem
On Saturday, May 17, 2014 12:59:22 PM Ali Çehreli via Digitalmars-d-learn wrote: > On 05/17/2014 12:33 PM, John Colvin wrote: > > On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via > > > > Digitalmars-d-learn wrote: > >> I'm building a program which I intend to have many threads that can > >> each send > >> messages to (and receive messages from) each other. The obvious way > >> to do > >> this would be to have a shared array of Tids, but this seems to not > >> work. I'm > >> continually fighting the system to get it to compile, and this makes > >> me think > >> it should probably be done some other way...but what? > >> > >> One possibility is to have each thread maintain a separate array that > >> contains > >> all the threads, which would mean that they would need to be > >> initialized after > >> they were created. This would avoid the problems of shared Tids, but > >> each Tid > >> contains a private mailbox, so this would be being duplicated, and that > >> bothers me...it seems like a poor idea. (Maybe I'm wrong about > >> that...but I > >> don't know.) > > > > If my understanding is correct, each Tid contains a reference to the > > corresponding thread's MessageBox (implemented by way of MessageBox > > being a class), not an independent instance. You should be fine to just > > have an array of the relevant Tids in each thread. > > > > Alternatively, a single __gshared array of threads should work, given > > you are sufficiently careful with it. Remember, if no-one is doing any > > writing then you don't need to do any synchronisation of reads. > > The following is what I've come up with. I had to use a number of > shared-related casts. > > import std.stdio; > import std.concurrency; > import std.datetime; > import std.random; > import core.thread; > > enum threadCount = 5; > enum messagePerThread = 3; > > // Represents messages sent to threads to start their tasks > struct Start > {} > > // Receives the number (id) of this thread and the workers to send > messages to > void workerFunc(size_t id, shared(Tid)[] workers) > { > receiveOnly!Start(); > > // A local function to reduce code duplication > bool checkMessageForMe(Duration timeout) > { > return receiveTimeout( > timeout, > (size_t from) { > writefln("%s received from %s", id, from); > }); > } > > // My main task is to send messages to others: > size_t totalSent = 0; > while (totalSent < messagePerThread) { > auto to = uniform(0, workers.length); > > // Only send to others; not to self > if (to != id) { > auto chosen = cast(Tid)workers[to]; > writefln("%s sending to %s", id, to); > chosen.send(id); > ++totalSent; > } > > checkMessageForMe(0.seconds); > } > > // Process trailing messages sent to me > bool received = false; > do { > received = checkMessageForMe(10.msecs); > } while (received); > } > > void main() > { > auto workers = new shared(Tid)[threadCount]; > > foreach (id; 0 .. threadCount) { > auto worker = spawn(&workerFunc, id, workers); > workers[id] = cast(shared(Tid))worker; > } > > foreach (sharedWorker; workers) { > auto worker = cast(Tid)sharedWorker; > worker.send(Start()); > } > > thread_joinAll(); > } > > Sample output: > > 0 sending to 2 > 4 sending to 3 > 4 sending to 2 > 1 sending to 4 > 3 received from 4 > 3 sending to 2 > 0 sending to 1 > 4 received from 1 > 1 received from 0 > 1 sending to 0 > 0 received from 1 > 0 sending to 1 > 1 received from 0 > 1 sending to 0 > 0 received from 1 > 3 sending to 2 > 4 sending to 2 > 2 sending to 0 > 2 received from 0 > 2 received from 4 > 3 sending to 1 > 2 sending to 3 > 0 received from 2 > 1 received from 3 > 2 received from 3 > 2 sending to 0 > 3 received from 2 > 0 received from 2 > 2 received from 3 > 2 received from 4 > > Ali Thank you immensely. That is precisely the kind of information I was hoping for.
Re: std.concurrency thread communication problem
On 05/17/2014 12:33 PM, John Colvin wrote: On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via Digitalmars-d-learn wrote: I'm building a program which I intend to have many threads that can each send messages to (and receive messages from) each other. The obvious way to do this would be to have a shared array of Tids, but this seems to not work. I'm continually fighting the system to get it to compile, and this makes me think it should probably be done some other way...but what? One possibility is to have each thread maintain a separate array that contains all the threads, which would mean that they would need to be initialized after they were created. This would avoid the problems of shared Tids, but each Tid contains a private mailbox, so this would be being duplicated, and that bothers me...it seems like a poor idea. (Maybe I'm wrong about that...but I don't know.) If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread. Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads. The following is what I've come up with. I had to use a number of shared-related casts. import std.stdio; import std.concurrency; import std.datetime; import std.random; import core.thread; enum threadCount = 5; enum messagePerThread = 3; // Represents messages sent to threads to start their tasks struct Start {} // Receives the number (id) of this thread and the workers to send messages to void workerFunc(size_t id, shared(Tid)[] workers) { receiveOnly!Start(); // A local function to reduce code duplication bool checkMessageForMe(Duration timeout) { return receiveTimeout( timeout, (size_t from) { writefln("%s received from %s", id, from); }); } // My main task is to send messages to others: size_t totalSent = 0; while (totalSent < messagePerThread) { auto to = uniform(0, workers.length); // Only send to others; not to self if (to != id) { auto chosen = cast(Tid)workers[to]; writefln("%s sending to %s", id, to); chosen.send(id); ++totalSent; } checkMessageForMe(0.seconds); } // Process trailing messages sent to me bool received = false; do { received = checkMessageForMe(10.msecs); } while (received); } void main() { auto workers = new shared(Tid)[threadCount]; foreach (id; 0 .. threadCount) { auto worker = spawn(&workerFunc, id, workers); workers[id] = cast(shared(Tid))worker; } foreach (sharedWorker; workers) { auto worker = cast(Tid)sharedWorker; worker.send(Start()); } thread_joinAll(); } Sample output: 0 sending to 2 4 sending to 3 4 sending to 2 1 sending to 4 3 received from 4 3 sending to 2 0 sending to 1 4 received from 1 1 received from 0 1 sending to 0 0 received from 1 0 sending to 1 1 received from 0 1 sending to 0 0 received from 1 3 sending to 2 4 sending to 2 2 sending to 0 2 received from 0 2 received from 4 3 sending to 1 2 sending to 3 0 received from 2 1 received from 3 2 received from 3 2 sending to 0 3 received from 2 0 received from 2 2 received from 3 2 received from 4 Ali
Re: std.concurrency thread communication problem
On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via Digitalmars-d-learn wrote: I'm building a program which I intend to have many threads that can each send messages to (and receive messages from) each other. The obvious way to do this would be to have a shared array of Tids, but this seems to not work. I'm continually fighting the system to get it to compile, and this makes me think it should probably be done some other way...but what? One possibility is to have each thread maintain a separate array that contains all the threads, which would mean that they would need to be initialized after they were created. This would avoid the problems of shared Tids, but each Tid contains a private mailbox, so this would be being duplicated, and that bothers me...it seems like a poor idea. (Maybe I'm wrong about that...but I don't know.) If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread. Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
std.concurrency thread communication problem
I'm building a program which I intend to have many threads that can each send messages to (and receive messages from) each other. The obvious way to do this would be to have a shared array of Tids, but this seems to not work. I'm continually fighting the system to get it to compile, and this makes me think it should probably be done some other way...but what? One possibility is to have each thread maintain a separate array that contains all the threads, which would mean that they would need to be initialized after they were created. This would avoid the problems of shared Tids, but each Tid contains a private mailbox, so this would be being duplicated, and that bothers me...it seems like a poor idea. (Maybe I'm wrong about that...but I don't know.) I do know that I want a n by n communication matrix (leaving out the main thread), with each thread sending messages to all to others. (Well, except for a few that I haven't really defined yet, but which handle separated functions.) My plan was to have each thread run an execution loop which frequently checked for messages received in between performing its own functions. They are not intended to synchronize with each other. They are not intended to be temporary, i.e., each of these threads would be started shortly after program initialization, and continue running until program termination. But how should I get them to know each other's address? I don't want the main thread to need to act as a switchboard between all the others, though I guess that would "sort of" work. (Actually, if I need to do that, that job would be pulled off into yet another thread...and I end up with more threads than processors. Still, that's a design that is possible, IIUC.) Any comments or suggestions?