On Mon, Jun 15, 2015 at 3:23 AM, Martin Teichmann <
martin.teichm...@gmail.com> wrote:

> Hi Guido,
>
>
>> I am still wondering, what is your actual use case? Multiple coroutines
>> writing to the same stream sounds like the exception, not the common case
>> -- even when you pay careful attention to the appearance of yield-from, you
>> still have the problem that you can't control which of several coroutines
>> runs first. I must assume that you have a protocol over TCP/IP that has
>> some kind of framing convention, and you always ensure that a coroutine
>> writes an entire frame without yielding.
>>
>
> I have a small application that monitors our servers here, and sends the
> results of this monitoring to a single socket for analysis. The
> analysis tool runs outside of the server farm and is not as well connected
> as the servers inside the farm are amongst each other. The analysis
> tool is supposed to log into the monitoring application, tell which
> parameters it intends to monitor, and then gets those parameters sent over
> that single socket using a framed protocol.
>
> Now, for each parameter that I monitor, I write a coroutine (those are
> typically very small, ten lines on average I guess). That coroutine
> more-or-less does
>
>     while True:
>          data = yield from get_monitored_parameter()
>          writer.write(header)
>          writer.write(data)
>          yield from writer.drain()
>
> I wrote my own drain coroutine (mostly like the locks you proposed), so
> that I can call it from
> several tasks, and this gives me a simple flow control. It could be a good
> idea to add such
> locks to to the standard library drain, but I can also see if people think
> that this is to special
> a use case. I added a working patch to this post just in case.
>

Aha, you are logging. That's the other case where relative ordering of
stuff you write needn't matter. :-)


> I wonder if you could just a bare Transport/Protocol pair instead of a
>> StreamWriter? In that case you'd have to implement pause_writing() and
>> resume_writing() in your Protocol class if you want to have the equivalent
>> of drain(). (But do you? Is there nothing at a higher level in your
>> protocol to prevent your writing coroutines from overwhelming the other end
>> of the socket?)
>>
>
> I'm using StreamWriter because this is what start_server returns...
>

Well, start_server() wraps loop.create_server(). But I get your point,
especially since you also have the login protocol to deal with.


> I want to keep things simple, my
> code is currently 345 lines and works. I think the write-drain-loop
> approach is a flow control concept
> really easy to grasp, that's why I went for this solution.
>
> Sure, I could write some handshaking to avoid overwhelming the other end
> of the socket, but
> TCP already does that for me, so why bother.
>

Fair enough. I guess there is a real danger to overwhelm the socket?
Otherwise you don't even need the drain() call. But it looks like you are
basically logging as fast as you can, so the drain() seems important.


> Greetings
>
> Martin
>
> My promised patch follows:
>
> --- streams2.py 2015-06-11 16:49:14.851855051 +0200
> +++ streams.py  2015-06-15 11:04:35.119700288 +0200
> @@ -13,6 +13,7 @@
>  from . import coroutines
>  from . import events
>  from . import futures
> +from . import locks
>  from . import protocols
>  from .coroutines import coroutine
>  from .log import logger
> @@ -151,6 +152,7 @@
>              self._loop = loop
>          self._paused = False
>          self._drain_waiter = None
> +        self._drain_lock = locks.Lock(loop=self._loop)
>          self._connection_lost = False
>
>      def pause_writing(self):
> @@ -191,13 +193,12 @@
>      def _drain_helper(self):
>          if self._connection_lost:
>              raise ConnectionResetError('Connection lost')
> -        if not self._paused:
> -            return
> -        waiter = self._drain_waiter
> -        assert waiter is None or waiter.cancelled()
> -        waiter = futures.Future(loop=self._loop)
> -        self._drain_waiter = waiter
> -        yield from waiter
> +        with (yield from self._drain_lock):
> +            if not self._paused:
> +                return
> +            waiter = futures.Future(loop=self._loop)
> +            self._drain_waiter = waiter
> +            yield from waiter
>
>
>  class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
>
>
I still think that this is a relatively rare case and I don't want to
encourage sharing streams between coroutines. Maybe you can submit a patch
to turn the assert into a RuntimeError?

-- 
--Guido van Rossum (python.org/~guido)

Reply via email to