Re: Multi-Thread message passing approach

2016-08-16 Thread Charles Hixson via Digitalmars-d-learn



On 08/16/2016 07:21 AM, Kagamin via Digitalmars-d-learn wrote:

On Monday, 15 August 2016 at 01:53:33 UTC, Charles Hixson wrote:
If I modify the code to attempt to pass a Tid[] as a member of struct 
Start I get:


/usr/include/dmd/phobos/std/concurrency.d(603): Error: static assert  
"Aliases to mutable thread-local data not allowed."

test.d(47):instantiated from here: send!(Start)

and this seems to imply that the casting away of shared might also be 
an unsafe access which just doesn't happen to be detected by the 
library.


Currently not all types, that are supposed to be thread-safe, support 
shared, e.g. Semaphore, so it doesn't necessarily mean that sharing 
them is incorrect.


Thanks.  But "not all types" is a bit worrisome.  Fortunately it's just 
as easy to pass a fixed length array of Tids through send, and that even 
yields simpler code.  The problems I was running into turned out to be 
because there was a "local reference", i.e. the dynamic array.  I just 
wasn't understanding the error message.  So now I don't need to use the 
"shared data" that I was worrying about.  I am a bit surprised, however, 
as Tids include a private class reference, and it seems as if that 
should raise the same error flag, but it doesn't.


Of course, the real problem is that just because concurrent software 
seems to be working right when you test it doesn't say anything about 
how it will perform under load...so I want to avoid anything even 
questionable.


Re: Multi-Thread message passing approach

2016-08-16 Thread Kagamin via Digitalmars-d-learn

On Monday, 15 August 2016 at 01:53:33 UTC, Charles Hixson wrote:
If I modify the code to attempt to pass a Tid[] as a member of 
struct Start I get:


/usr/include/dmd/phobos/std/concurrency.d(603): Error: static 
assert  "Aliases to mutable thread-local data not allowed."

test.d(47):instantiated from here: send!(Start)

and this seems to imply that the casting away of shared might 
also be an unsafe access which just doesn't happen to be 
detected by the library.


Currently not all types, that are supposed to be thread-safe, 
support shared, e.g. Semaphore, so it doesn't necessarily mean 
that sharing them is incorrect.


Re: Multi-Thread message passing approach

2016-08-15 Thread Charles Hixson via Digitalmars-d-learn
I misunderstood the problem.  The problem was that a dynamically sized 
array cannot be sent as a message.  So this works:


import   std.concurrency;
import   std.stdio;

import   core.thread;

enum  tidMax   =  10;
struct   Start {  int   tidCnt   =  0; Tid[tidMax] tids; }
struct   Msg   {  int   orig; int   dest; }
struct   Done  {  int   dummy =  0; }

void  worker (int ndx)
{
   writeln  ("worker ", ndx, " spawned");
   Start start =  receiveOnly!(Start)();
   Tid[] tids;
   foreach (i; 0 .. start.tidCnt)   {  tids  ~= start.tids[i]; }
   writeln  ("worker ", ndx, " got tidList");
   for   (int i = 0; i < 3;   i++)
   {  if (i != ndx)
  {  Msg msg  =  Msg(ndx, i);
 send (tids[i], msg);
  }
   }
   writeln  ("worker ", ndx, " sent messages");
   bool  done  =  false;
   while (!done)
   {  receive
  (  (Msg msg)   {  writeln ("msg from: ", msg.orig, ", to: ", 
msg.dest); },

 (Done d) {  done = true;   }
  );
   }
   writeln ("worker ", ndx, " is done");
}

void  main()
{
   Start start;
   Done  done;
   for   (int i = 0; i < 3;   i++)
   {  auto tid =  spawn (&worker, i);
  start.tids[start.tidCnt++] =  tid;
   }
   foreach (i; 0 .. start.tidCnt)   {  send (start.tids[i], start); }
   Thread.sleep (1000.msecs);
   foreach (i; 0 .. start.tidCnt)   {  send (start.tids[i], done); }
   Thread.sleep (1000.msecs);
   writeln ("main is done");
}



Re: Multi-Thread message passing approach

2016-08-14 Thread Charles Hixson via Digitalmars-d-learn
Looking at the std.concurrency code, it appears that Tid is just a 
handle to a class, so multiple assignments should all refer to the same 
underlying class, and it looks like that underlying class (MessageBox) 
uses mutexes to ensure safe handling of multiple access.  So this shared 
access to tids should be safe.  But in that case why can't Tids be 
passed as function parameters to spawn, which would have avoided the 
requirement for a fixed length shared array?  So I don't trust my 
reasoning here.


If I modify the code to attempt to pass a Tid[] as a member of struct 
Start I get:


/usr/include/dmd/phobos/std/concurrency.d(603): Error: static assert  
"Aliases to mutable thread-local data not allowed."

test.d(47):instantiated from here: send!(Start)

and this seems to imply that the casting away of shared might also be an 
unsafe access which just doesn't happen to be detected by the library.


On 08/14/2016 07:44 AM, Charles Hixson via Digitalmars-d-learn wrote:
This is an approach to n x n thread message passing. The idea is that 
each thread should be able to pass messages to any other thread.  The 
only alternative I've come up with involves the main thread handling 
each message.  Is that a better approach?  Is there a better way to 
pass lists of Tids?


importstd.concurrency;
importstd.stdio;

importcore.thread;

sharedTid[3]tidList;

structStart{intdummy=0;}
structMsg{intorig;intdest;}
structDone{intdummy=0;}

voidworker (int ndx)
{
writeln("worker ", ndx, " spawned");
{  automsg=receiveOnly!(Start)();
writeln("worker ", ndx, " started");
}
Tid[]tids;
foreach(t; tidList){tids~=cast(Tid)t;}
for(int i = 0;i < 3;i++)
{   if(i != ndx)
{  Msg msg=Msg(ndx, i);
send (tids[i], msg);
}
}
writeln("worker ", ndx, " got tidList");
booldone=false;
while(!done)
{  receive
(   (Msg msg){writeln ("msg from: ", msg.orig, ", to: 
", msg.dest);},

(Done d){done = true;}
);
}
writeln ("worker ", ndx, " is done");
}

voidmain()
{
Startstart;
Donedone;
for(int i = 0;i < 3;i++)
{  auto tid=spawn (&worker, i);
tidList[i]=cast(shared Tid)tid;
}
foreach (i; 0 .. 3){send (cast(Tid)tidList[i], start);}
Thread.sleep (1000.msecs);
foreach (i; 0 .. 3){send (cast(Tid)tidList[i], done);}
Thread.sleep (1000.msecs);
writeln ("main is done");
}






Re: Multi-Thread message passing approach

2016-08-14 Thread Charles Hixson via Digitalmars-d-learn

On 08/14/2016 07:44 AM, Charles Hixson via Digitalmars-d-learn wrote:

This is an approach to n x n thread message passing. The idea is that 
each thread should be able to pass messages to any other thread.  The 
only alternative I've come up with involves the main thread handling 
each message.  Is that a better approach?  Is there a better way to 
pass lists of Tids?


An attempt using the main thread as a relay follows.  I have fewer 
doubts about it working, as Tids aren't accessed by multiple threads, 
but the logic is more convoluted, and harder to get right.


import   std.concurrency;
import   std.stdio;

import   core.thread;

struct   Start {  int   dummy =  0; }
struct   Msg   {  int   orig; int   dest; }
struct   Done  {  int   dummy =  0; }

enum  threadCnt   =  3;

void  worker (int ndx)
{
   writeln  ("worker ", ndx, " spawned");
   {  auto  msg   =  receiveOnly!(Start)();
  writeln  ("worker ", ndx, " started");
   }
   Tid   owner =  ownerTid;
   for   (int i = 0; i < threadCnt;   i++)
   {  if (i != ndx)
  {  Msg msg  =  Msg(ndx, i);
 send (owner, msg);
  }
   }
   writeln  ("worker ", ndx, " sent msgs");
   bool  done  =  false;
   while (!done)
   {  receive
  (  (Msg msg)   {  writeln ("msg from: ", msg.orig, ", to: ", 
msg.dest); },
 (Done d) {  done = true;   writeln ("thread ", ndx, " is 
done.");   }

  );
   }
   send (owner, Done(ndx));
   writeln ("worker ", ndx, " is done");
}

void  main()
{  Tid[] tidList;
   Start start;
   Done  done;
   for   (int i = 0; i < threadCnt;   i++)
   {  tidList  ~=spawn (&worker, i);
   }
   yield;
   foreach (i; 0 .. threadCnt)  {  send (tidList[i], start);  }
   int   tidLength   =  cast(int)tidList.length;
   Thread.sleep (500.msecs);
   foreach (i; 0 .. threadCnt)  {  send (tidList[i], done);  }

   while (tidLength > 0)
   {  receive
  (  (Msg msg)   {  send (tidList[msg.dest], msg);   },
 (Done d) { writeln ("tid ", d.dummy, " is done..", 
--tidLength, " remaining.");  }

  );
   }
   writeln ("main is done");
}