A slightly more straightforward implementation can be found 
at https://gist.github.com/ztellman/fb64e81d1d7f0b261ccd.  I'm fairly sure 
it's equivalent to Cristophe's, but I may be missing some nuance.  At any 
rate, I've found using the async/put! and callback mechanism to be a much 
more straightforward way to do interop with non-core.async code.

And yes, reimplementing a TCP-like ack mechanism on top of WebSockets is 
not something you want to do.  The existing stack will do it better and 
faster than you can.  Just to be clear, this is a large part of why I wrote 
Manifold [1], which can easily be turned into a core.async channel, but 
provides an API which is designed for interop with other stream mechanisms 
(including synchronous ones like Java's BlockingQueues).  core.async is a 
framework, meaning it brings not only a stream representation, but an 
entire execution model; using core.async should be an application-level 
decision, not one made for you by your libraries

Zach

[1] https://github.com/ztellman/manifold


On Sunday, October 12, 2014 9:42:32 AM UTC-7, Ryan Waters wrote:
>
> I was just starting to use Sente [1] (which relies on httpkit [2]) and 
> this conversation is a real eye opener.  Unless a person uses a library 
> that supports backpressure, as mentioned earlier, your 
> transport-concern-made-opaque-because-of-core-async must become an 
> application-level concern.  The far-side of the communication would have to 
> respond with an application level acknowledgement for local sends and the 
> local side would need to not send data unless acks were received for 
> previously sent data.
>
> E.g. this could be implemented with core.async by using a pair of channels 
> (instead of a single channel) for all 'sends' where one channel is used for 
> data while the other channel waits for acknowledgement of put data (a 
> 'control' channel).  This would have the unfortunate side effect of hurting 
> throughput.  A better system would be to allow for a certain number of 
> unacknowledged sends before backing off.  Of course, now a person is 
> implementing what was created for TCP at the level of their application.
>
> Christophe's approach means you at least wouldn't have to do the above, 
> replacing it instead with a per backend implementation.  I hope somebody 
> else is able to explain it better.
>
> Looking forward to an Aleph rewrite!!
>
> - - -
> [1] https://github.com/ptaoussanis/sente
> [2] https://github.com/http-kit/http-kit
>
> On Sat, Oct 11, 2014 at 8:01 PM, Julian <julian...@gmail.com <javascript:>
> > wrote:
>
>> Hi Zach, 
>>
>> Thanks for the clarity of thought that went into this post. 
>>
>> Perhaps it is obvious to everyone but me, but I saw this post by 
>> Christophe Grande yesterday that appears to address these concerns:
>> "Back-pressurized interop for core.async" 
>> https://twitter.com/cgrand/status/520566182194450432
>> https://gist.github.com/cgrand/767673242b7f7c27f35a
>>
>> I'm interested to hear if this solves your problem or is about something 
>> else. 
>>
>> Cheers
>> Julian
>>
>>
>> On Wednesday, 8 October 2014 17:00:02 UTC+11, Zach Tellman wrote:
>>>
>>> The reason the thread-per-connection approach is nice is because it 
>>> correctly propagates backpressure.  If we're copying data from a source to 
>>> a sink (let's say reading it in from the network and writing to a file), 
>>> it's possible that the production of data may outstrip the consumption.  If 
>>> this happens, we need to make sure the producer slows down, or we risk 
>>> running out of memory.  In Java, the producer is typically connected to the 
>>> consumer via a blocking queue, and if the queue fills up the producer can't 
>>> send anything more to the consumer.  A Java socket is one such queue, and 
>>> if it fills up it will exert backpressure via TCP.  This will work no 
>>> matter how many queues or other mechanisms separate the producer and 
>>> consumer.
>>>
>>> However, every attempt I've seen to marry core.async to an async network 
>>> stack has been fundamentally broken, in that it doesn't do this.  Often, 
>>> they'll just use 'put!', which works fine until the channel's queue fills 
>>> up, and 1024 pending puts are accumulated, and finally the channel throws 
>>> an exception.  Alternately, they'll use a blocking put on the channel, 
>>> which means that any backpressure will also extend to whatever other 
>>> connections are sharing that thread or the thread pool.  Note that the 
>>> software that uses core.async in this way may work flawlessly in a wide 
>>> variety of cases, but there's still an intractable failure mode lying in 
>>> wait.
>>>
>>> In some cases, such as http-kit's websocket mechanism, there's no way to 
>>> even exert backpressure (you register a callback, and have no way to 
>>> indicate in your callback that you can't handle more messages).  This means 
>>> that any attempt to use http-kit in conjunction with core.async will be 
>>> subtly but fundamentally broken.  Arguably, even without core.async in the 
>>> equation it's broken.  This is not a good state of affairs.  I'll admit 
>>> that it took me a few failures in production to realize how important 
>>> correct handling of backpressure is, but this isn't something that our 
>>> ecosystem can afford to ignore, especially as Clojure is used for 
>>> larger-scale projects.
>>>
>>> I will note that I am working on a solution to this, in the form of the 
>>> upcoming Aleph release [1].  This will model every network connection via 
>>> streams that can trivially be converted into core.async channels [2], and 
>>> which exert backpressure over TCP wherever necessary without requiring a 
>>> thread per connection.  A formal beta should be available in the near 
>>> future (it's already handling billions of requests a day in production 
>>> without issue).
>>>
>>> Zach
>>>
>>> [1] https://github.com/ztellman/aleph/tree/0.4.0
>>> [2] https://github.com/ztellman/manifold
>>>
>>>
>>>
>>> On Tuesday, October 7, 2014 1:36:16 PM UTC-7, adrian...@mail.yu.edu 
>>> wrote:
>>>>
>>>> It's not about 'safety' (depending on what that means in this context), 
>>>> but as Zach pointed out, if you aren't careful about backpressure you can 
>>>> run into performance bottlenecks with unrestrained async IO operations 
>>>> because although they let you code as if you could handle an unlimited 
>>>> amount of connections, obviously that isn't true. There is only a finite 
>>>> amount of data that can be buffered in and out of any network according to 
>>>> its hardware. When you don't regulate that, your system will end up 
>>>> spending an inordinate amount of time compensating for this. You don't 
>>>> need 
>>>> to worry about this with "regular io" because the "thread per connection" 
>>>> abstraction effectively bounds your activity within the acceptable 
>>>> physical 
>>>> constraints of the server. 
>>>>
>>>> On Tuesday, October 7, 2014 2:49:30 PM UTC-4, Brian Guthrie wrote:
>>>>>
>>>>>
>>>>> On Mon, Oct 6, 2014 at 12:10 AM, <adrian...@mail.yu.edu> wrote:
>>>>>
>>>>>> Zach makes an excellent point; I've used AsyncSocketChannels and its 
>>>>>> irk (http://docs.oracle.com/javase/8/docs/api/java/nio/channels/
>>>>>> AsynchronousServerSocketChannel.html), with core.async in the past. 
>>>>>> Perhaps replacing your direct java.net.Sockets with nio classes that can 
>>>>>> be 
>>>>>> given CompletionHandlers (http://docs.oracle.com/
>>>>>> javase/7/docs/api/java/nio/channels/CompletionHandler.html) would be 
>>>>>> a better fit. 
>>>>>>
>>>>>
>>>>> Once I do some performance instrumentation I'll give that a shot. I 
>>>>> admit that I'm not familiar with all the implications of using the nio 
>>>>> classes; were I to switch, is it safe to continue using go blocks, or is 
>>>>> it 
>>>>> worth explicitly allocating a single thread per socket?
>>>>>
>>>>> Brian
>>>>>
>>>>  -- 
>> You received this message because you are subscribed to the Google
>> Groups "Clojure" group.
>> To post to this group, send email to clo...@googlegroups.com 
>> <javascript:>
>> Note that posts from new members are moderated - please be patient with 
>> your first post.
>> To unsubscribe from this group, send email to
>> clojure+u...@googlegroups.com <javascript:>
>> For more options, visit this group at
>> http://groups.google.com/group/clojure?hl=en
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Clojure" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to clojure+u...@googlegroups.com <javascript:>.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to