If you want to simply keep some messages in order, then a cat transducer
will do the trick just fine:


;; need a buffer or buffer size to get to the transducer param
(def c (chan 20 cat))

(async/onto-chan c [1])

(async/>!! c [2 3 4])
(async/close! c)

(<!! (async/into [] c))
;; returns [1 2 3 4]

In this case, 2 3 4 will always be adjacent due to the way transducers are
processed in the channel. A single value will come in, and the results of
running that value through the transducer will be spooled into the
channel's internals.




On Wed, Oct 11, 2017 at 6:09 AM, Gary Trakhman <gary.trakh...@gmail.com>
wrote:

> If you don't want to split at the consumer I would recommend adding
> another channel, and piping it with a cat xform as mentioned above.
>
> On Oct 11, 2017 3:28 AM, "JokkeB" <jahve...@gmail.com> wrote:
>
>> Thanks. I'm not interested in reading the messages as pairs, I just need
>> to make sure nothing gets in between them. I think I'll choose some way to
>> merge the messages, for example just a vector like you suggested, and then
>> have the end consumer recognise and split the vector. I guess splitting the
>> vector into two messages somewhere before the end consumer can't be made
>> deterministic.
>>
>> keskiviikko 11. lokakuuta 2017 8.02.40 UTC+3 Rangel Spasov kirjoitti:
>>>
>>> I think simply wrapping the two values in a vector is the only thing
>>> that's needed in this case. Simply do a put like (>!! ch [my-val-1
>>> my-val-2]) and take from the channel from another place. If my-val-1 and
>>> my-val-2 are not immediately available, you can have a separate mechanism
>>> (atoms, (loop [...] (recur ...)) or another set of core.async channels
>>> perhaps) that "collects" them until they are ready to be sent together as
>>> one value.
>>>
>>> partition-all on its own does not really help with the ordering here.
>>> For example:
>>>
>>> (let [ch (chan 42 (partition-all 2))]
>>>   (>!! ch :x')
>>>   (thread                                                   ;in another 
>>> thread far, far away
>>>     (<!! (timeout (rand-int 1000)))                         ;simulate busy 
>>> for random 0-99ms
>>>     ;put value on ch
>>>     (>!! ch :y'))
>>>
>>>   (<!! (timeout (rand-int 1000)))                           ;simulate busy 
>>> for random 0-99ms
>>>   (>!! ch :x'')
>>>
>>>   ;take from ch
>>>   (<!! ch))
>>>
>>>
>>> This will non-deterministically output either
>>> => [:x' :x'']
>>>
>>> or
>>>
>>> => [:x' :y']
>>>
>>> But again as I said, you can employ partition-all separately as a
>>> "collecting" mechanism before doing the (>!! ch [my-val-1 my-val-2]).
>>> Didn't write an example for that but let me know if that's not clear and
>>> I can put together a few lines.
>>>
>>> On Tuesday, October 10, 2017 at 7:50:20 AM UTC-7, Gary Trakhman wrote:
>>>>
>>>> So, at the point where you need 2 things to be consecutive, wrap them
>>>> up in a vector so they're one thing. `(partition-all 2)` will return a
>>>> transducer xform that chunks up your message stream by 2
>>>> https://clojuredocs.org/clojure.core/partition-all, and if you need to
>>>> undo it you can do so with the `cat` transducer
>>>> https://clojuredocs.org/clojure.core/cat .
>>>>
>>>> On Tue, Oct 10, 2017 at 10:42 AM JokkeB <jahv...@gmail.com> wrote:
>>>>
>>>>> Thanks for the response. That's what I suspected.
>>>>>
>>>>> How does partition help me in this case? I can't see any way of using
>>>>> it.
>>>>>
>>>>> How about onto-chan, would that be deterministic? Or any other
>>>>> function?
>>>>>
>>>>>
>>>>> tiistai 10. lokakuuta 2017 17.33.22 UTC+3 Gary Trakhman kirjoitti:
>>>>>
>>>>>> I think a core assumption is that all writes can yield control at any
>>>>>> time to other worker go processes.  I wouldn't rely on the consecutive
>>>>>> behavior even if it were true, nondeterminism is a good assumption with
>>>>>> core.async.  If you need that particular guarantee, consider a call to
>>>>>> partition?
>>>>>>
>>>>>> On Tue, Oct 10, 2017 at 10:30 AM JokkeB <jahv...@gmail.com> wrote:
>>>>>>
>>>>> I'm wondering about a case when using async:
>>>>>>>
>>>>>>> I have a channel where I write from multiple threads. One of the
>>>>>>> sources is a go-loop with a timeout, so each second I write something to
>>>>>>> the channel. If I do two consecutive writes inside the go block, can
>>>>>>> another thread write between the two writes? Or can I rely on the two
>>>>>>> messages being consecutive in the channel?
>>>>>>>
>>>>>>> --
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Clojure" group.
>>>>>>>
>>>>>> To post to this group, send email to clo...@googlegroups.com
>>>>>>
>>>>>>
>>>>>>> Note that posts from new members are moderated - please be patient
>>>>>>> with your first post.
>>>>>>> To unsubscribe from this group, send email to
>>>>>>>
>>>>>> clojure+u...@googlegroups.com
>>>>>>
>>>>>>
>>>>>>> For more options, visit this group at
>>>>>>> http://groups.google.com/group/clojure?hl=en
>>>>>>> ---
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Clojure" group.
>>>>>>>
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to clojure+u...@googlegroups.com.
>>>>>>
>>>>>>
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Clojure" group.
>>>>> To post to this group, send email to clo...@googlegroups.com
>>>>> Note that posts from new members are moderated - please be patient
>>>>> with your first post.
>>>>> To unsubscribe from this group, send email to
>>>>> clojure+u...@googlegroups.com
>>>>> For more options, visit this group at
>>>>> http://groups.google.com/group/clojure?hl=en
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Clojure" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to clojure+u...@googlegroups.com.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>> --
>> You received this message because you are subscribed to the Google
>> Groups "Clojure" group.
>> To post to this group, send email to clojure@googlegroups.com
>> Note that posts from new members are moderated - please be patient with
>> your first post.
>> To unsubscribe from this group, send email to
>> clojure+unsubscr...@googlegroups.com
>> For more options, visit this group at
>> http://groups.google.com/group/clojure?hl=en
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Clojure" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to clojure+unsubscr...@googlegroups.com.
>> For more options, visit https://groups.google.com/d/optout.
>>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
“One of the main causes of the fall of the Roman Empire was that–lacking
zero–they had no way to indicate successful termination of their C
programs.”
(Robert Firth)

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to