Re: Concurrency Confusion
On Monday, 10 August 2015 at 22:50:15 UTC, sigod wrote: On Monday, 10 August 2015 at 22:21:18 UTC, 岩倉 澪 wrote: Took a look in phobos and it appears to be from this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1904 It looks like you're trying to use `receiveTimeout` like this: bool value; receiveTimeout(-1.msecs, value); Ah, nope I just assumed the closest assert to the line mentioned in the assertion failure was the culprit without thinking about it much. You are correct that the assertion your pull requests removes is the one that gave me trouble. I'll leave it as 0.msecs until your pull request is merged and a new release made, thanks for the help!
Re: Concurrency Confusion
On Monday, 10 August 2015 at 22:31:33 UTC, sigod wrote: On Monday, 10 August 2015 at 22:21:18 UTC, 岩倉 澪 wrote: [...] It should be this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1910 [...] This lines still there: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L2081 I'll remove mentioned assert. https://github.com/D-Programming-Language/phobos/pull/3545
Re: Concurrency Confusion
On Monday, 10 August 2015 at 22:21:18 UTC, 岩倉 澪 wrote: On Saturday, 8 August 2015 at 06:24:30 UTC, sigod wrote: Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911 actually this no longer appears to be true? Passing -1.msecs as the duration gives me an assertion failure: core.exception.AssertError@std/concurrency.d(1902): Assertion failure Took a look in phobos and it appears to be from this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1904 It looks like you're trying to use `receiveTimeout` like this: bool value; receiveTimeout(-1.msecs, value); But you must use it like this: bool value; receiveTimeout(-1.msecs, (bool b) { value = b; }); See [`receive`][0] for example. [0]: http://dlang.org/phobos/std_concurrency.html#.receive
Re: Concurrency Confusion
On Monday, 10 August 2015 at 22:21:18 UTC, 岩倉 澪 wrote: On Saturday, 8 August 2015 at 06:24:30 UTC, sigod wrote: Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911 actually this no longer appears to be true? Passing -1.msecs as the duration gives me an assertion failure: core.exception.AssertError@std/concurrency.d(1902): Assertion failure Took a look in phobos and it appears to be from this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1904 It should be this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1910 If you look at the implementation of receiveTimeout, you'll see that it no longer has these lines from the stack overflow answer: if( period.isNegative || !m_putMsg.wait( period ) ) return false; https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L824 This lines still there: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L2081 I'll remove mentioned assert.
Re: Concurrency Confusion
On Monday, 10 August 2015 at 22:21:18 UTC, 岩倉 澪 wrote: On Saturday, 8 August 2015 at 06:24:30 UTC, sigod wrote: Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911 actually this no longer appears to be true? Passing -1.msecs as the duration gives me an assertion failure: core.exception.AssertError@std/concurrency.d(1902): Assertion failure Took a look in phobos and it appears to be from this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1904 If you look at the implementation of receiveTimeout, you'll see that it no longer has these lines from the stack overflow answer: if( period.isNegative || !m_putMsg.wait( period ) ) return false; https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L824 That's weird. Especially when latest commit is mine: https://github.com/D-Programming-Language/phobos/commit/c8048fa48832a97f033b748f5e6b8edde3f2ae29
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 06:24:30 UTC, sigod wrote: Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911 actually this no longer appears to be true? Passing -1.msecs as the duration gives me an assertion failure: core.exception.AssertError@std/concurrency.d(1902): Assertion failure Took a look in phobos and it appears to be from this line: https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L1904 If you look at the implementation of receiveTimeout, you'll see that it no longer has these lines from the stack overflow answer: if( period.isNegative || !m_putMsg.wait( period ) ) return false; https://github.com/D-Programming-Language/phobos/blob/master/std/concurrency.d#L824
Re: Concurrency Confusion
There's indeed a good reason: Variant is a kitchen sink wrapper and tries to declare questionable code for shared type, and compiler catches that. Quite an impressive example of shared type qualifier in action, even though Variant uses a lot of casting so there's not a lot of type system at work there.
Re: Concurrency Confusion
On Sunday, 9 August 2015 at 21:06:10 UTC, anonymous wrote: On Sunday, 9 August 2015 at 17:43:59 UTC, 岩倉 澪 wrote: Afaict it is the best way to do what I'm trying to do, and since the data is mutable and cast to immutable with assumeUnique, casting it back to mutable shouldn't be a problem. Technically casting away immutable might be undefined behaviour and it might be an ugly hack, but I don't see a more idiomatic solution. I think casting to shared and back would be better. Unfortunately, it looks like std.concurrency.send doesn't like shared arrays. I filed an issue: https://issues.dlang.org/show_bug.cgi?id=14893 I agree! I initially tried to cast to shared and back, but when I encountered that compiler error I decided to go with immutable. I assumed that there was a good reason it didn't work, rather than a deficiency in the language. Hopefully the issue can be resolved, leading to a nicer solution. :)
Re: Concurrency Confusion
On Sunday, 9 August 2015 at 17:43:59 UTC, 岩倉 澪 wrote: Afaict it is the best way to do what I'm trying to do, and since the data is mutable and cast to immutable with assumeUnique, casting it back to mutable shouldn't be a problem. Technically casting away immutable might be undefined behaviour and it might be an ugly hack, but I don't see a more idiomatic solution. I think casting to shared and back would be better. Unfortunately, it looks like std.concurrency.send doesn't like shared arrays. I filed an issue: https://issues.dlang.org/show_bug.cgi?id=14893
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 05:14:20 UTC, Meta wrote: I'm not completely sure that it's bad in this case, but you really shouldn't be casting away immutable. It's undefined behaviour in D. Afaict it is the best way to do what I'm trying to do, and since the data is mutable and cast to immutable with assumeUnique, casting it back to mutable shouldn't be a problem. Technically casting away immutable might be undefined behaviour and it might be an ugly hack, but I don't see a more idiomatic solution.
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 06:24:30 UTC, sigod wrote: Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911 On Saturday, 8 August 2015 at 13:34:24 UTC, Chris wrote: Note aside: if you only import what you need (say `import std.concurrency : receiveTimeout; std.datetime : msecs`), you can reduce the size of the executable considerably as your program grows. Thanks for the tips!
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote: On Friday, 7 August 2015 at 22:13:35 UTC, 岩倉 澪 wrote: "message" is local to the delegate that receiveTimeout takes. I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!) I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work... Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-le...@puremagic.com I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so: (in module scope) Bar[] baz; (in application loop) import std.array if(baz.empty) { import std.concurrency, std.datetime; receiveTimeout(0.msecs, (immutable Bar[] bar){ baz = cast(Bar[])bar; }); } Note aside: if you only import what you need (say `import std.concurrency : receiveTimeout; std.datetime : msecs`), you can reduce the size of the executable considerably as your program grows.
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 01:24:04 UTC, 岩倉 澪 wrote: On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote: receiveTimeout(0.msecs, (immutable Bar[] bar){ baz = cast(Bar[])bar; }); Whoops, that should be: receiveTimeout(0.msecs, (immutable(Bar)[] bar){ baz = cast(Bar[])bar; }); Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote: Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-le...@puremagic.com I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so: (in module scope) Bar[] baz; (in application loop) import std.array if(baz.empty) { import std.concurrency, std.datetime; receiveTimeout(0.msecs, (immutable Bar[] bar){ baz = cast(Bar[])bar; }); } I'm not completely sure that it's bad in this case, but you really shouldn't be casting away immutable. It's undefined behaviour in D.
Re: Concurrency Confusion
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote: receiveTimeout(0.msecs, (immutable Bar[] bar){ baz = cast(Bar[])bar; }); Whoops, that should be: receiveTimeout(0.msecs, (immutable(Bar)[] bar){ baz = cast(Bar[])bar; });
Re: Concurrency Confusion
On Friday, 7 August 2015 at 22:13:35 UTC, 岩倉 澪 wrote: "message" is local to the delegate that receiveTimeout takes. I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!) I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work... Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-le...@puremagic.com I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so: (in module scope) Bar[] baz; (in application loop) import std.array if(baz.empty) { import std.concurrency, std.datetime; receiveTimeout(0.msecs, (immutable Bar[] bar){ baz = cast(Bar[])bar; }); }
Re: Concurrency Confusion
On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote: To stop threads immediately, I've found that the best way is to use a shared variable, typically a bool, that is changed only in one place. ... Unfortunately, sending an abort message to a thread as in `send(thread, true)` takes too long. Setting a global flag like ABORT is instantaneous. Beware of data races though. You might want to have a look at: http://ddili.org/ders/d.en/concurrency_shared.html Especially `synchronized` and atomicOp. Ah, I already had a variable like ABORT in my application for signaling the main thread to close, so this was a surprisingly painless change! I made that variable shared and then did the following: instead of ABORT = true; I now do import core.atomic; atomicStore!(MemoryOrder.rel)(ABORT, true); and instead of if(ABORT) break; I now do import core.atomic; if(atomicLoad!(MemoryOrder.acq)(ABORT)) break; This works great, and with the memory ordering specified I do not see a noticeable difference in performance, whereas with the default memory ordering my ~36 second processing takes ~38 seconds. One concern I had was that `break` might be a bad idea inside of a parallel foreach. Luckily, it seems that the author(s) of std.parallelism thought of this - according to the documentation break inside of a parallel foreach throws an exception and some clever exception handling is done under the hood. I don't see an uncaught exception when I close my application, but it is now able to close without having to wait for the worker thread to complete, so everything seems fine and dandy! Thanks for the help! On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote: receiveTimeout can be used like this: ... My problem is that when you do this: received = receiveTimeout(600.msecs, (string message) { // <=== Receiving a value writeln("received: ", message); }); "message" is local to the delegate that receiveTimeout takes. I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!) I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work...
Re: Concurrency Confusion
On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote: Using a shared boolean is probably not the "best way", I should have said the most efficient and reliable way.
Re: Concurrency Confusion
On Thursday, 6 August 2015 at 21:17:15 UTC, 岩倉 澪 wrote: On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote: // in real app use `receiveTimeout` to do useful stuff until // result message is received auto output = receiveOnly!(immutable(Bar)[]); New question: how would I receive a immutable value with receiveTimeout? I need the results from my worker thread outside of the delegate that receiveTimeout takes. Also: what is the best way to kill off the worker thread when I close the application, without having to wait for the worker thread to complete? My first thought was to use receiveTimeout in the worker thread, but the work is being done in a parallel foreach loop, and I am not sure if there is a way to safely use receiveTimeout in a parallel situation... I also found Thread.isDaemon in core.thread. I tried doing auto thread = Thread.getThis(); thread.isDaemon = true; at the start of the worker thread, but it still seems to wait for it to complete before closing. Thanks again! receiveTimeout can be used like this: void main() { spawn(&workerFunc); writeln("Waiting for a message"); bool received = false; while (!received) { received = receiveTimeout(600.msecs, (string message) { // <=== Receiving a value writeln("received: ", message); }); if (!received) { writeln("... no message yet"); /* ... other operations may be executed here ... */ } } } (cf. http://ddili.org/ders/d.en/concurrency.html) To stop threads immediately, I've found that the best way is to use a shared variable, typically a bool, that is changed only in one place. I hope I'll find the time on Monday to post a simple example. 1. shared bool ABORT; 2. 3.// in owner thread 4. ABORT = true; // The only place where you do this. 5. bool res; 6. while ((res = receiveOnly!bool()) == false) { debug writeln("waiting for abort ..."); } // in worker thread(s) foreach () { if (ABORT) break; // working away } // ... ownerTid.send(true); If you have more than one thread to abort, you'll have to adapt lines 5 and 6 accordingly. Unfortunately, sending an abort message to a thread as in `send(thread, true)` takes too long. Setting a global flag like ABORT is instantaneous. Beware of data races though. You might want to have a look at: http://ddili.org/ders/d.en/concurrency_shared.html Especially `synchronized` and atomicOp.
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote: // in real app use `receiveTimeout` to do useful stuff until // result message is received auto output = receiveOnly!(immutable(Bar)[]); New question: how would I receive a immutable value with receiveTimeout? I need the results from my worker thread outside of the delegate that receiveTimeout takes. Also: what is the best way to kill off the worker thread when I close the application, without having to wait for the worker thread to complete? My first thought was to use receiveTimeout in the worker thread, but the work is being done in a parallel foreach loop, and I am not sure if there is a way to safely use receiveTimeout in a parallel situation... I also found Thread.isDaemon in core.thread. I tried doing auto thread = Thread.getThis(); thread.isDaemon = true; at the start of the worker thread, but it still seems to wait for it to complete before closing. Thanks again!
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 11:42:54 UTC, Dicebot wrote: On Tuesday, 4 August 2015 at 11:33:11 UTC, 岩倉 澪 wrote: On Tuesday, 4 August 2015 at 10:37:39 UTC, Dicebot wrote: std.concurrency does by-value message passing (in this case just ptr+length), it never deep copies automatically I assumed that it would deep copy (in the case of mutable data) since the data being sent is thread-local (unless I am misunderstanding something) It is heap-allocated and in there is no thread-local heap currently in D - only globals and static variables. std.concurrency never deep copies - if you are trying to send data which contains indirections (pointers/arrays) _and_ is not marked either immutable or shared, it will simply not compile. Ahh, thanks for the clarification! That makes a lot of sense
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 11:33:11 UTC, 岩倉 澪 wrote: On Tuesday, 4 August 2015 at 10:37:39 UTC, Dicebot wrote: std.concurrency does by-value message passing (in this case just ptr+length), it never deep copies automatically I assumed that it would deep copy (in the case of mutable data) since the data being sent is thread-local (unless I am misunderstanding something) It is heap-allocated and in there is no thread-local heap currently in D - only globals and static variables. std.concurrency never deep copies - if you are trying to send data which contains indirections (pointers/arrays) _and_ is not marked either immutable or shared, it will simply not compile.
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 10:37:39 UTC, Dicebot wrote: std.concurrency does by-value message passing (in this case just ptr+length), it never deep copies automatically I assumed that it would deep copy (in the case of mutable data) since the data being sent is thread-local (unless I am misunderstanding something)
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 10:29:57 UTC, 岩倉 澪 wrote: On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote: auto output = receiveOnly!(immutable(Bar)[]); Won't message passing like this result in an expensive copy, or does the cast to immutable via assumeUnique avoid that? immutable data is implicitly shared and doesn't need to be duplicated - as it won't ever be modified, it is perfectly fine to do concurrent reads for that from multiple references. std.concurrency does by-value message passing (in this case just ptr+length), it never deep copies automatically
Re: Concurrency Confusion
On Tue, 04 Aug 2015 10:29:55 + "岩倉 澪" wrote: > On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote: > > auto output = receiveOnly!(immutable(Bar)[]); > > Won't message passing like this result in an expensive copy No it will copy only struct containing length and pointer to data, so it is cheap.
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 08:36:26 UTC, John Colvin wrote: Do you mean this instead? spawn(&fooPtrToBarArr, foo, bar); Yep, that was a typo when writing up the post! Anyway, you need to use shared, not __gshared, then it should work. I have been wary of shared because of: https://p0nce.github.io/d-idioms/#The-truth-about-shared
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote: auto output = receiveOnly!(immutable(Bar)[]); Won't message passing like this result in an expensive copy, or does the cast to immutable via assumeUnique avoid that?
Re: Concurrency Confusion
import std.concurrency; import std.typecons : Unique; import std.exception : assumeUnique; struct Foo { } struct Bar { } void bar_generator (Tid ownerTid) { receive( (shared(Foo)* input) { auto output = new Bar[100]; // compute output .. // .. and cast to immutable via assumeUnique when // finished as it won't be mutated anymore and no // other pointers exist send(ownerTid, assumeUnique(output)); } ); } void main () { auto generator = spawn(&bar_generator, thisTid); // must be shared or immutable to be passed between threads auto input = new shared Foo; send(generator, input); // in real app use `receiveTimeout` to do useful stuff until // result message is received auto output = receiveOnly!(immutable(Bar)[]); import std.stdio; writeln(output.length); }
Re: Concurrency Confusion
On Tuesday, 4 August 2015 at 08:03:54 UTC, 岩倉 澪 wrote: Hi all, I'm a bit confused today (as usual, haha). I have a pointer to a struct (let's call it Foo) allocated via a C library. I need to do some expensive computation with the Foo* to create a Bar[], but I would like to do that computation in the background, because the Bar[] is not needed right away. I definitely do not want there to be a copy of all elements of the Bar[] between threads, because it is very large. I tried to implement it like this: void fooPtrToBarArr(in shared Foo* f, out shared Bar[] b){ /*do work*/ } __gshared Foo* foo; foo = allocateFoo(); __gshared Bar[] bar; spawn(foo, bar); To my dismay, it results in a cryptic compiler error: template std.concurrency.spawn cannot deduce function from argument types !()(void function(shared(const(Foo*)) f, out shared(Bar[]) b), Foo*, Bar[]), candidates are: /usr/include/dlang/dmd/std/concurrency.d(466): std.concurrency.spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) Any help would be greatly appreciated :) Do you mean this instead? spawn(&fooPtrToBarArr, foo, bar); Anyway, you need to use shared, not __gshared, then it should work. E.g. import std.concurrency; struct Foo{} auto allocateFoo() { return new Foo(); } struct Bar{} void fooPtrToBarArr(in shared Foo* f, out shared Bar[] b){ /*do work*/ } void main() { shared Foo* foo = cast(shared)allocateFoo(); shared Bar[] bar; spawn(&fooPtrToBarArr, foo, bar); } Then you will have to cast away shared to make use of bar in your normal code (or continue using it as shared, but that would be frustrating). To totally avoid any ordering concerns, you could put a full mfence in before casting away shared (http://dlang.org/phobos/core_atomic.html#.atomicFence).
Re: Concurrency Confusion
On 08/04/2015 01:03 AM, "岩倉 澪" wrote: Hi all, I'm a bit confused today (as usual, haha). I have a pointer to a struct (let's call it Foo) allocated via a C library. I need to do some expensive computation with the Foo* to create a Bar[], but I would like to do that computation in the background, because the Bar[] is not needed right away. I definitely do not want there to be a copy of all elements of the Bar[] between threads, because it is very large. I tried to implement it like this: void fooPtrToBarArr(in shared Foo* f, out shared Bar[] b){ /*do work*/ } __gshared Foo* foo; foo = allocateFoo(); __gshared Bar[] bar; spawn(foo, bar); To my dismay, it results in a cryptic compiler error: template std.concurrency.spawn cannot deduce function from argument types !()(void function(shared(const(Foo*)) f, out shared(Bar[]) b), Foo*, Bar[]), candidates are: /usr/include/dlang/dmd/std/concurrency.d(466): std.concurrency.spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) Any help would be greatly appreciated :) __gshared behaves like C globals and need not be passed to spawned functions. (Although, if needed, they must be passed as shared and casted back to non-shared in the thread function.) The following seems to achieve what you describe: import std.stdio; import std.concurrency; import core.thread; struct Foo {} struct Bar { int i; } void fooPtrToBarArr() { bar ~= Bar(42); } __gshared Foo* foo; __gshared Bar[] bar; void main() { spawn(&fooPtrToBarArr); thread_joinAll(); writeln(bar); } Ali