Re: std.concurrency thread communication problem

2014-05-17 Thread Charles Hixson via Digitalmars-d-learn
On Saturday, May 17, 2014 12:59:22 PM Ali Çehreli via Digitalmars-d-learn 
wrote:
> On 05/17/2014 12:33 PM, John Colvin wrote:
> > On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
> > 
> > Digitalmars-d-learn wrote:
> >> I'm building a program which I intend to have many threads that can
> >> each send
> >> messages to (and receive messages from) each other.  The obvious way
> >> to do
> >> this would be to have a shared array of Tids, but this seems to not
> >> work.  I'm
> >> continually fighting the system to get it to compile, and this makes
> >> me think
> >> it should probably be done some other way...but what?
> >> 
> >> One possibility is to have each thread maintain a separate array that
> >> contains
> >> all the threads, which would mean that they would need to be
> >> initialized after
> >> they were created.  This would avoid the problems of shared Tids, but
> >> each Tid
> >> contains a private mailbox, so this would be being duplicated, and that
> >> bothers me...it seems like a poor idea.  (Maybe I'm wrong about
> >> that...but I
> >> don't know.)
> > 
> > If my understanding is correct, each Tid contains a reference to the
> > corresponding thread's MessageBox (implemented by way of MessageBox
> > being a class), not an independent instance. You should be fine to just
> > have an array of the relevant Tids in each thread.
> > 
> > Alternatively, a single __gshared array of threads should work, given
> > you are sufficiently careful with it. Remember, if no-one is doing any
> > writing then you don't need to do any synchronisation of reads.
> 
> The following is what I've come up with. I had to use a number of
> shared-related casts.
> 
> import std.stdio;
> import std.concurrency;
> import std.datetime;
> import std.random;
> import core.thread;
> 
> enum threadCount = 5;
> enum messagePerThread = 3;
> 
> // Represents messages sent to threads to start their tasks
> struct Start
> {}
> 
> // Receives the number (id) of this thread and the workers to send
> messages to
> void workerFunc(size_t id, shared(Tid)[] workers)
> {
>  receiveOnly!Start();
> 
>  // A local function to reduce code duplication
>  bool checkMessageForMe(Duration timeout)
>  {
>  return receiveTimeout(
>  timeout,
>  (size_t from) {
>  writefln("%s received from %s", id, from);
>  });
>  }
> 
>  // My main task is to send messages to others:
>  size_t totalSent = 0;
>  while (totalSent < messagePerThread) {
>  auto to = uniform(0, workers.length);
> 
>  // Only send to others; not to self
>  if (to != id) {
>  auto chosen = cast(Tid)workers[to];
>  writefln("%s sending to %s", id, to);
>  chosen.send(id);
>  ++totalSent;
>  }
> 
>  checkMessageForMe(0.seconds);
>  }
> 
>  // Process trailing messages sent to me
>  bool received = false;
>  do {
>  received = checkMessageForMe(10.msecs);
>  } while (received);
> }
> 
> void main()
> {
>  auto workers = new shared(Tid)[threadCount];
> 
>  foreach (id; 0 .. threadCount) {
>  auto worker = spawn(&workerFunc, id, workers);
>  workers[id] = cast(shared(Tid))worker;
>  }
> 
>  foreach (sharedWorker; workers) {
>  auto worker = cast(Tid)sharedWorker;
>  worker.send(Start());
>  }
> 
>  thread_joinAll();
> }
> 
> Sample output:
> 
> 0 sending to 2
> 4 sending to 3
> 4 sending to 2
> 1 sending to 4
> 3 received from 4
> 3 sending to 2
> 0 sending to 1
> 4 received from 1
> 1 received from 0
> 1 sending to 0
> 0 received from 1
> 0 sending to 1
> 1 received from 0
> 1 sending to 0
> 0 received from 1
> 3 sending to 2
> 4 sending to 2
> 2 sending to 0
> 2 received from 0
> 2 received from 4
> 3 sending to 1
> 2 sending to 3
> 0 received from 2
> 1 received from 3
> 2 received from 3
> 2 sending to 0
> 3 received from 2
> 0 received from 2
> 2 received from 3
> 2 received from 4
> 
> Ali
Thank you immensely.  That is precisely the kind of information I was hoping 
for. 




Re: std.concurrency thread communication problem

2014-05-17 Thread Ali Çehreli via Digitalmars-d-learn

On 05/17/2014 12:33 PM, John Colvin wrote:

On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
Digitalmars-d-learn wrote:

I'm building a program which I intend to have many threads that can
each send
messages to (and receive messages from) each other.  The obvious way
to do
this would be to have a shared array of Tids, but this seems to not
work.  I'm
continually fighting the system to get it to compile, and this makes
me think
it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate array that
contains
all the threads, which would mean that they would need to be
initialized after
they were created.  This would avoid the problems of shared Tids, but
each Tid
contains a private mailbox, so this would be being duplicated, and that
bothers me...it seems like a poor idea.  (Maybe I'm wrong about
that...but I
don't know.)


If my understanding is correct, each Tid contains a reference to the
corresponding thread's MessageBox (implemented by way of MessageBox
being a class), not an independent instance. You should be fine to just
have an array of the relevant Tids in each thread.

Alternatively, a single __gshared array of threads should work, given
you are sufficiently careful with it. Remember, if no-one is doing any
writing then you don't need to do any synchronisation of reads.


The following is what I've come up with. I had to use a number of 
shared-related casts.


import std.stdio;
import std.concurrency;
import std.datetime;
import std.random;
import core.thread;

enum threadCount = 5;
enum messagePerThread = 3;

// Represents messages sent to threads to start their tasks
struct Start
{}

// Receives the number (id) of this thread and the workers to send 
messages to

void workerFunc(size_t id, shared(Tid)[] workers)
{
receiveOnly!Start();

// A local function to reduce code duplication
bool checkMessageForMe(Duration timeout)
{
return receiveTimeout(
timeout,
(size_t from) {
writefln("%s received from %s", id, from);
});
}

// My main task is to send messages to others:
size_t totalSent = 0;
while (totalSent < messagePerThread) {
auto to = uniform(0, workers.length);

// Only send to others; not to self
if (to != id) {
auto chosen = cast(Tid)workers[to];
writefln("%s sending to %s", id, to);
chosen.send(id);
++totalSent;
}

checkMessageForMe(0.seconds);
}

// Process trailing messages sent to me
bool received = false;
do {
received = checkMessageForMe(10.msecs);
} while (received);
}

void main()
{
auto workers = new shared(Tid)[threadCount];

foreach (id; 0 .. threadCount) {
auto worker = spawn(&workerFunc, id, workers);
workers[id] = cast(shared(Tid))worker;
}

foreach (sharedWorker; workers) {
auto worker = cast(Tid)sharedWorker;
worker.send(Start());
}

thread_joinAll();
}

Sample output:

0 sending to 2
4 sending to 3
4 sending to 2
1 sending to 4
3 received from 4
3 sending to 2
0 sending to 1
4 received from 1
1 received from 0
1 sending to 0
0 received from 1
0 sending to 1
1 received from 0
1 sending to 0
0 received from 1
3 sending to 2
4 sending to 2
2 sending to 0
2 received from 0
2 received from 4
3 sending to 1
2 sending to 3
0 received from 2
1 received from 3
2 received from 3
2 sending to 0
3 received from 2
0 received from 2
2 received from 3
2 received from 4

Ali



Re: std.concurrency thread communication problem

2014-05-17 Thread John Colvin via Digitalmars-d-learn
On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via 
Digitalmars-d-learn wrote:
I'm building a program which I intend to have many threads that 
can each send
messages to (and receive messages from) each other.  The 
obvious way to do
this would be to have a shared array of Tids, but this seems to 
not work.  I'm
continually fighting the system to get it to compile, and this 
makes me think

it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate 
array that contains
all the threads, which would mean that they would need to be 
initialized after
they were created.  This would avoid the problems of shared 
Tids, but each Tid
contains a private mailbox, so this would be being duplicated, 
and that
bothers me...it seems like a poor idea.  (Maybe I'm wrong about 
that...but I

don't know.)


If my understanding is correct, each Tid contains a reference to 
the corresponding thread's MessageBox (implemented by way of 
MessageBox being a class), not an independent instance. You 
should be fine to just have an array of the relevant Tids in each 
thread.


Alternatively, a single __gshared array of threads should work, 
given you are sufficiently careful with it. Remember, if no-one 
is doing any writing then you don't need to do any 
synchronisation of reads.


std.concurrency thread communication problem

2014-05-17 Thread Charles Hixson via Digitalmars-d-learn
I'm building a program which I intend to have many threads that can each send 
messages to (and receive messages from) each other.  The obvious way to do 
this would be to have a shared array of Tids, but this seems to not work.  I'm 
continually fighting the system to get it to compile, and this makes me think 
it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate array that contains 
all the threads, which would mean that they would need to be initialized after 
they were created.  This would avoid the problems of shared Tids, but each Tid 
contains a private mailbox, so this would be being duplicated, and that 
bothers me...it seems like a poor idea.  (Maybe I'm wrong about that...but I 
don't know.)

I do know that I want a n by n communication matrix (leaving out the main 
thread), with each thread sending messages to all to others.  (Well, except 
for a few that I haven't really defined yet, but which handle separated 
functions.)  My plan was to have each thread run an execution loop which 
frequently checked for messages received in between performing its own 
functions.  They are not intended to synchronize with each other.  They are 
not intended to be temporary, i.e., each of these threads would be started 
shortly after program initialization, and continue running until program 
termination.  But how should I get them to know each other's address?

I don't want the main thread to need to act as a switchboard between all the 
others, though I guess that would "sort of" work.  (Actually, if I need to do 
that, that job would be pulled off into yet another thread...and I end up with 
more threads than processors.  Still, that's a design that is possible, IIUC.)

Any comments or suggestions?