On 07Jan2016 12:14, richard kappler <richkapp...@gmail.com> wrote:
On Thu, Jan 7, 2016 at 12:07 PM, James Chapman <ja...@uplinkzero.com> wrote:
From an architectural POV I'd have a few listener threads that upon
receipt would spawn (or take from a pool is a better approach) a worker
thread to process the received data.

As would I.

That's the plan, if I'm understanding you correctly. We've brainstormed the
threading, haven't written any of it yet.

The code you've posted should be fine for testing a single connection.

I'd be doing 2 things to what you posted, myself:

 - use plain old .read to collect the data and assemble the XML packets

 - decouple your XML parsing from the collection and packet parsing

To the first, I suspect that when you have our packets arriving rapidly you are either dropping data because the data overflows your 8192 recv size or you're getting multiple logical packets stuffed into a buffer:

 recv #1:
   \x02xml...\x03\x02partial-xml

 recv #2:
   tail-of-previous-xml\x03\x02more-xml...

which would definitiely get your XML parser unhappy.

Instead, gather the data progressively and emit XML chunks. You've got a TCP stream - the TCPServer class will do an accept and handle you an _unbuffered_ binary stream file from which you can just .read(), ignoring any arbitrary "packet" sizes. For example (totally untested) using a generator:

 def xml_extractor(fp):
''' Read a continuous stream of bytes from `fp`, yield bytes to be parsed elsewhere. An arbitrary size of 8192 bytes is used to request more data; it doesn't say anything about any underlying network packet size.
   '''
   # a (buffer, offset) pair of ungathered data
   buffer = b''
   offset = 0
   # locate start of XML chunk
   while True:
     if offset >= len(buffer):
       buffer = fp.read1(8192)
       offset = 0
       if not buffer:
         # EOF: exit generator
         return
     # examine the next byte
     b = buffer[offset]
     offset += 1
     if b == 0x02:
       # opening delimiter
       break
     warning("discard byte 0x%02x", b)
   # gather XML chunk
   chunks = []
   while True:
     endpos = buffer.find(b'\x03', offset)
     if endpos < 0:
       # no delimiter, collect entire chunk
       chunks.append(buffer[offset:])
       offset = len(buffer)
     else:
       # collect up to the delimiter
       chunks.append(buffer[offset:endpos])
       offset = endpos + 1
       break
     # keep collecting...
     if offset >= len(buffer):
       buffer = fp.read1(8192)
       offset = 0
       if not buffer:
         error("EOF: incomplete final XML packet found: %r", b''.join(chunks))
         return
   # yield the XML bytes
   yield b''.join(chunks)
   chunks = None   # release chunks so memory can be freed promptly

This reads bytes into a buffer and locates the 0x02...0x03 boundaries and yields the bytes in between. Then your main stream decoder just looks like this:

 for xml_bytes in xml_extractor(fp):
   # decode the bytes into a str
   xml_s = xml_bytes.decode('utf-8')
   ... pass xml_s to your XML parser ...

All of this presumes you have a binary file-like object reading from your TCP stream. And since we're suggesting you spawn a Thread per connection, I'm suggesting you use the TCPServer class from the socketserver module, using its ThreadingMixin. That gets you a threading TCP server.

Query: do the cameras connect to you, or do you connect to the cameras? I'm presuming the former.

So the surround framework would create a TCPServer instance listening on your ip:port, and have a handler method which is given a "request" parameter by TCPServer. That object has a .rfile property which is a read-only binary stream for reading from the socket, and _that_ is what we refer to as `fp` in the code above.

Setting up the TCPServer is pretty simple. Lifting the essential bits from some code of my own (again, untested):

 from socketserver import TCPServer, ThreadingMixIn, StreamRequestHandler

 class MyServer(ThreadingMixIn, TCPServer):
   def __init__(self, bind_addr):
     TCPServer.__init__(self, bind_addr, MyRequestHandler)

 class MyRequestHandler(StreamRequestHandler):
   def handle(self):
     fp = self.rfile
     for xml_bytes in xml_extractor(fp):
       # decode the bytes into a str
       xml_s = xml_bytes.decode('utf-8')
       ... pass xml_s to your XML parser ...

 # start the server
 S = MyServer( ("hostname", 9999) )
 S.serve_forever()

One critical bit in the above is the use of .read1() in the xml_extractor function: that calls the underlying stream's .read() method at most once, so that it behaves like a UNIX read() call and may return a "short" read - less than the maximum supplied. This is what you need to return data as soon as it is received. By contrast, the traditional Python .read() call will try to gather bytes until it has the amount asked for, which means that it will block. You definitely need read1() for this kind of work.

Cheers,
Cameron Simpson <c...@zip.com.au>
_______________________________________________
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
https://mail.python.org/mailman/listinfo/tutor

Reply via email to