Author: ayoung
Date: Mon Nov 28 10:50:48 2011
New Revision: 9557

URL: http://svn.slimdevices.com/jive?rev=9557&view=rev
Log:
bug 17692: Stream proxy capability for SqueezePlay 
Allow chunks handled in streambuf.c to be proxied to additional streams.
Declare capability to server.
Enhance SllimProto capability mechanism to allow a function to be registered as 
a capability. This is needed because the network may not be set up at the time 
the Playback object is created (and may later changed), and the local network 
address needs to be passed in the capability.
A previously unused field in the strm-s packet is used to tell Playback the 
number of slave streams expected. 
The data chunks always remain in C data space, with only some small control 
information passing via Lua.

Modified:
    7.7/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
    7.7/trunk/squeezeplay/src/squeezeplay/share/jive/net/SlimProto.lua
    7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.c
    7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h

Modified: 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
URL: 
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua?rev=9557&r1=9556&r2=9557&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua 
(original)
+++ 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua Mon Nov 
28 10:50:48 2011
@@ -10,12 +10,14 @@
 local hasDecode, decode      = pcall(require, "squeezeplay.decode")
 local hasSprivate, spprivate = pcall(require, "spprivate")
 local Stream                 = require("squeezeplay.stream")
+local socket                 = require("socket") -- for proxy streams
 local SlimProto              = require("jive.net.SlimProto")
 local Player                 = require("jive.slim.Player")
 
 local Task                   = require("jive.ui.Task")
 local Timer                  = require("jive.ui.Timer")
 local Framework              = require("jive.ui.Framework")
+local Networking             = require("jive.net.Networking")
 
 local debug                  = require("jive.utils.debug")
 local log                    = require("jive.utils.log").logger("audio.decode")
@@ -41,9 +43,13 @@
 local TCP_CLOSE_LOCAL_TIMEOUT = 4
 
 
--- Do NOT set a read timeout, the stream may be paused indefinately
+-- Do NOT set a read timeout, the stream may be paused indefinitely
 local STREAM_READ_TIMEOUT = 0
 local STREAM_WRITE_TIMEOUT = 5
+
+local PROXY_WRITE_TIMEOUT = 0 -- because the stream may be paused
+local PROXY_CONNECT_TIMEOUT = STREAM_WRITE_TIMEOUT + 1
+local PROXY_LISTEN_PORT = 9001
 
 local LOCAL_PAUSE_STOP_TIMEOUT = 400
 
@@ -123,6 +129,26 @@
 
        -- signal we are Rtmp capable, but don't load module until used
        slimproto:capability("Rtmp", 2)
+
+       slimproto:capability(function()
+                       local ip_address, ip_subnet
+                       local ifObj = Networking:activeInterface()
+               
+                       if ifObj then
+                               ip_address, ip_subnet = 
ifObj:getIPAddressAndSubnet()
+                               if not ip_address then                          
          
+                                       log:warn('Cannot get ip_address for 
active network interface ', ifObj)
+                               end                                             
                                          
+                       else
+                               log:warn('Cannot find active network interface')
+                       end
+
+                       if ip_address then
+                               return "Proxy", tostring(ip_address) .. ":" .. 
tostring(PROXY_LISTEN_PORT)
+                       else
+                               return nil
+                       end
+               end)
 
        self.mode = 0
        self.threshold = 0
@@ -138,6 +164,9 @@
        self.sentAudioUnderrunEvent = false
        self.ignoreStream = false
        self.decodeThreshold = 2048
+       
+       self.proxy = nil
+       self.proxyListener = nil
 
        return obj
 end
@@ -266,7 +295,7 @@
 
        -- enable stream reads when decode buffer is not full
        if status.decodeFull < status.decodeSize and self.stream then
-               self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT)
+               self:_proxyAndStream(true)
        end
 
        if status.decodeState & DECODE_UNDERRUN ~= 0 or
@@ -443,7 +472,7 @@
 end
 
 
-function _streamConnect(self, serverIp, serverPort, reader, writer)
+function _streamConnect(self, serverIp, serverPort, reader, writer, slaves)
        log:info("connect ", _ipstring(serverIp), ":", serverPort, " ", 
string.match(self.header, "(.-)\n"))
 
        if serverIp ~= self.slimproto:getServerIp() then
@@ -483,16 +512,230 @@
                m.read  = m._streamRead
                m.write = m._streamWrite
        end 
+       
+       self:_proxyInit(slaves, self.stream)
 
        local wtask = Task("streambufW", self, _streamWrite, nil, 
Task.PRIORITY_AUDIO)
        self.jnt:t_addWrite(self.stream, wtask, STREAM_WRITE_TIMEOUT)
        
        self.rtask = Task("streambufR", self, _streamRead, nil, 
Task.PRIORITY_AUDIO)
-       self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT)
+       self:_proxyAndStream(true)
 
        self.slimproto:sendStatus('STMc')
 end
 
+function _proxyQueueSegment(self, chunk)
+       if self.proxy then
+               table.insert(self.proxy.q, chunk)
+       end
+end
+
+function _proxyConnClose(self, conn, err, leaveConnectionTable)
+       log:info("Proxy connection closed: from ", conn.ip, ':', conn.port, '; 
', err or '')
+       self.jnt:t_removeWrite(conn.stream)
+       self.jnt:t_removeRead(conn.stream)
+       conn.stream:close()
+       conn.chunk = nil
+       if self.proxy and not leaveConnectionTable then
+               for i, c in ipairs(self.proxy.connections) do
+                       if c == conn then
+                               table.remove(self.proxy.connections, i)
+                               break
+                       end
+               end
+       end
+end
+
+function _proxyWrite(self, conn, networkErr)
+       if networkErr then
+               self:_proxyConnClose(conn, networkErr)
+               return
+       end
+       
+       while true do
+               local n, err
+               if conn.chunk then
+                       n, err = Stream:proxyWrite(conn.stream, conn.chunk, 
conn.chunkOffset)
+                       if n then -- stuff left
+                               conn.chunkOffset = n
+                       else
+                               conn.chunk = nil
+                               self.jnt:t_removeWrite(conn.stream)
+                               -- Let reading be restarted by the status timer
+                               -- once the decoder is running.
+                               -- This prevents the streambuf starving the cpu
+                               self:_proxyAndStream(not self.sentResumeDecoder)
+                       end
+               end
+               
+               if err then
+                       self:_proxyConnClose(conn, err)
+                       break
+               end
+               
+               _, networkErr = Task:yield(false)
+       end
+end
+       
+function _proxyRead(self, conn, networkErr)
+       if networkErr then
+               log:info("proxyRead: ", err)
+               return
+       end
+       
+       while true do
+               local n, err = conn.stream:receive(10000)
+               if err then
+                       log:info("proxyRead: ", err)
+                       break
+               elseif n then
+                       log:info("proxyRead received ", #n);
+               end
+               _, networkErr = Task:yield(false)
+       end
+       self.jnt:t_removeRead(conn.stream)
+end
+
+function _proxyAccept(self, networkErr)
+       -- XXX check error
+       
+       while true do
+               local stream
+               stream, networkErr = self.proxyListener:accept()
+               
+               if networkErr then
+                       log:info("proxyAccept: ", networkErr)
+                       break
+               end
+
+               if stream then
+                       local conn = {}
+                       conn.ip, conn.port = stream:getpeername()
+                       log:info("Proxy connection accepted: from ", conn.ip, 
':', conn.port)
+                       conn.stream = stream
+                       stream:settimeout(0)
+                       conn.wtask = Task("proxyW", self, 
+                                       function (self, networkErr) 
self:_proxyWrite(conn, networkErr) end,
+                                       nil, Task.PRIORITY_AUDIO)
+                       conn.rtask = Task("proxyR", self, 
+                                       function (self, networkErr) 
self:_proxyRead(conn, networkErr) end,
+                                       nil, Task.PRIORITY_AUDIO)
+                       self.jnt:t_addRead(conn.stream, conn.rtask, 
STREAM_WRITE_TIMEOUT) -- read and discard the request
+                       table.insert(self.proxy.connections, conn)
+                       
+                       self.proxy.expected = self.proxy.expected - 1
+                       if self.proxy.expected <= 0 then -- we have them all
+                               log:info("All proxy connections active")
+                               self.jnt:t_removeRead(self.proxyListener)
+                               self.proxy.listenTask = nil
+                               self:_proxyAndStream(true)
+                               return;
+                       end
+               end
+               
+               _, networkErr = Task:yield(false)
+       end
+       
+       self.jnt:t_removeRead(self.proxyListener)
+       self.proxy.listenTask = nil
+       
+       if self.proxy.expected > 0 then
+               log:warn('Not all proxy connections accepted: ', networkErr)
+               if self.stream and self.stream == self.proxy.stream then
+                       self:_streamDisconnect(TCP_CLOSE_LOCAL_TIMEOUT)
+               end
+       end
+end
+
+function _proxyCleanup(self)
+       if self.proxy then
+               -- close existing connections, etc.
+               self.jnt:t_removeRead(self.proxyListener)
+               self.proxy.listenTask = nil
+               
+               for i, c in ipairs(self.proxy.connections) do
+                       self:_proxyConnClose(c, nil, true)
+               end
+       
+               self.proxy = nil
+       end
+end
+
+function _proxyInit(self, expected, stream)
+       
+       if self.proxy then
+               if self.proxy.close and not self.proxy.listenTask then
+                       -- let them drain
+                       self.proxy.expected = expected
+                       self.proxy.stream = stream
+                       return
+               else
+                       self:_proxyCleanup()
+               end
+       end
+
+       if expected and expected > 0 then
+               log:info("Proxy: connections expected = ", expected)
+               local proxy = {}
+               proxy.expected = expected
+               proxy.stream = stream
+               proxy.q = {}
+               proxy.connections = {}
+               
+               if not self.proxyListener then
+                       self.proxyListener = socket.bind(0, PROXY_LISTEN_PORT)
+                       self.proxyListener:settimeout(0)
+               end
+               
+               proxy.listenTask = Task("proxyListen", self, _proxyAccept, nil, 
Task.PRIORITY_AUDIO)
+               self.jnt:t_addRead(self.proxyListener, proxy.listenTask, 
PROXY_CONNECT_TIMEOUT)
+               
+               self.proxy = proxy
+       end
+end
+
+function _proxyAndStream(self, canRead)
+       if not canRead then
+               self.jnt:t_removeRead(self.stream)
+       end
+
+       if self.proxy then
+               if self.proxy.listenTask then
+                       return
+               end
+               
+               for i, c in ipairs(self.proxy.connections) do
+                       if c.chunk then return end
+               end
+                                               
+               if #self.proxy.q > 0 then
+                       local chunk = table.remove(self.proxy.q, 1)
+                       for i, c in ipairs(self.proxy.connections) do
+                               c.chunk, c.chunkOffset = chunk, 0
+                               self.jnt:t_addWrite(c.stream, c.wtask, 
PROXY_WRITE_TIMEOUT)
+                       end
+                       return
+               end
+               
+               if self.proxy.close then
+                       for i, c in ipairs(self.proxy.connections) do
+                               self:_proxyConnClose(c, nil, true)
+                       end
+                       self.proxy.close = false
+                       self.proxy.connections = {}
+                       if self.proxy.expected > 0 and self.stream == 
self.proxy.stream then
+                               -- next connection now
+                               self:_proxyInit(self.proxy.expected, 
self.proxy.stream)
+                       end
+               end
+       end
+
+       if canRead then
+               self.jnt:t_addRead(self.stream, self.rtask, STREAM_READ_TIMEOUT)
+       end
+end
+                               
+                               
 
 function _streamDisconnect(self, reason, flush)
        if not self.stream then
@@ -511,6 +754,20 @@
 
        self.stream:disconnect()
        self.stream = nil
+       
+       if self.proxy then
+               if reason and not reason == TCP_CLOSE_FIN then
+                       self:_proxyCleanup()
+               else 
+                       if self.proxy.listenTask then
+                               self.proxy.listenTask = nil
+                               self.jnt:t_removeRead(self.proxyListener)
+                       end
+                       -- Close any proxy connections as soon as they have 
drained
+                       self.proxy.close = true
+                       self:_proxyAndStream(false)
+               end
+       end
 
        -- Notify SqueezeCenter the stream is closed
        if (flush) then
@@ -553,9 +810,7 @@
                -- stop reading if the decoder is running. the socket will
                -- be added again by the status timer. this prevents the 
                -- streambuf starving the cpu
-               if self.sentResumeDecoder then
-                       self.jnt:t_removeRead(self.stream)
-               end
+               self:_proxyAndStream(not self.sentResumeDecoder)
 
                _, networkErr = Task:yield(false)
 
@@ -701,7 +956,7 @@
                             string.byte(data.pcmChannels),
                             string.byte(data.pcmEndianness)
                    )
-                       self:_streamConnect(serverIp, data.serverPort)
+                       self:_streamConnect(serverIp, data.serverPort, nil, 
nil, data.slaves)
                end
 
        elseif data.command == 'q' then
@@ -757,6 +1012,8 @@
                _setSource(self, "off")
        end
        self:_streamDisconnect(nil, true)
+       
+       self:_proxyCleanup()
 
        self.tracksStarted = 0
 end

Modified: 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/net/SlimProto.lua
URL: 
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/squeezeplay/share/jive/net/SlimProto.lua?rev=9557&r1=9556&r2=9557&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/net/SlimProto.lua 
(original)
+++ 7.7/trunk/squeezeplay/src/squeezeplay/share/jive/net/SlimProto.lua Mon Nov 
28 10:50:48 2011
@@ -17,7 +17,7 @@
 =cut
 --]]
 
-local assert, ipairs, tonumber = assert, ipairs, tonumber
+local assert, ipairs, tonumber, type = assert, ipairs, tonumber, type
 
 
 local oo          = require("loop.base")
@@ -125,11 +125,25 @@
                        wlanList = wlanList | 0x4000
                end
 
-               local capabilities = table.concat(self.capabilities, ",")
+               local capabilities = {}
+               for i, key in ipairs(self.capabilities) do
+                       if type(key) == 'function' then
+                               local v
+                               key, v = key()
+                               if key and v then
+                                       key = key .. '=' .. v
+                               end
+                       end
+                       if key then
+                               table.insert(capabilities, key)
+                       end
+               end
+
+               capabilities = table.concat(capabilities, ",")
 
                -- always clear the syncgroupid after using it
                for i, key in ipairs(self.capabilities) do
-                       if string.match(key, "SyncgroupID=") then
+                       if type(key) == 'string' and string.match(key, 
"SyncgroupID=") then
                                table.remove(self.capabilities, i)
                                break
                        end
@@ -269,7 +283,7 @@
                        transitionType = string.sub(packet, 15, 15),
                        flags = unpackNumber(packet, 16, 1),
                        outputThreshold = unpackNumber(packet, 17, 1),
-                       -- reserved = unpackNumber(packet, 18, 1),
+                       slaves = unpackNumber(packet, 18, 1),
                        replayGain = unpackNumber(packet, 19, 4),
                        serverPort = unpackNumber(packet, 23, 2),
                        serverIp = unpackNumber(packet, 25, 4),

Modified: 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.c
URL: 
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.c?rev=9557&r1=9556&r2=9557&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.c (original)
+++ 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.c Mon Nov 28 
10:50:48 2011
@@ -50,6 +50,37 @@
 static u32_t icy_meta_interval;
 static s32_t icy_meta_remaining;
 
+struct chunk {
+       u8_t *buf;
+       size_t len;
+};
+
+static void proxy_chunk (u8_t *buf, size_t size, lua_State *L)
+{
+       if (L && size) {
+               struct chunk *chunk;
+               /*
+                * Send chunk to proxy clients
+                *
+                * Relies on this being sent before wrap-around occurs
+                * which is ensured by Playback.lua not scheduling any more 
reads
+                * on the stream until the queued chunk (or, initially, chunks)
+                * has been written to all proxy clients.
+                *
+                * At the start of a stream, there may be up to 3 queued chunks:
+                *      1. the header
+                *      2. the remains of the initial read up until fifo 
wrap-around
+                *      3. the remains of the initial read after fifo 
wrap-around
+                */
+               /*  */
+               lua_getfield(L, 2, "_proxyQueueSegment");
+               lua_pushvalue(L, 2);
+               chunk = lua_newuserdata(L, sizeof(*chunk));
+               chunk->buf = buf;
+               chunk->len = size;
+               lua_call(L, 2, 0);
+       }
+}
 
 size_t streambuf_get_size(void) {
        return STREAMBUF_SIZE;
@@ -127,7 +158,7 @@
 }
 
 
-void streambuf_feed(u8_t *buf, size_t size) {
+static void streambuf_feedL(u8_t *buf, size_t size, lua_State *L) {
        size_t n;
 
        fifo_lock(&streambuf_fifo);
@@ -144,6 +175,9 @@
                }
 
                memcpy(streambuf_buf + streambuf_fifo.wptr, buf, n);
+
+               proxy_chunk(streambuf_buf + streambuf_fifo.wptr, n, L);
+
                fifo_wptr_incby(&streambuf_fifo, n);
                size -= n;
        }
@@ -151,8 +185,11 @@
        fifo_unlock(&streambuf_fifo);
 }
 
-
-ssize_t streambuf_feed_fd(int fd) {
+void streambuf_feed(u8_t *buf, size_t size) {
+       streambuf_feedL(buf, size, 0);
+}
+
+ssize_t streambuf_feed_fd(int fd, lua_State *L) {
        ssize_t n, size;
 
        fifo_lock(&streambuf_fifo);
@@ -181,6 +218,8 @@
                streambuf_streaming = FALSE;
        }
        else {
+               proxy_chunk(streambuf_buf + streambuf_fifo.wptr, n, L);
+
                fifo_wptr_incby(&streambuf_fifo, n);
 
                streambuf_bytes_received += n;
@@ -581,7 +620,7 @@
 
        /* shortcut, just read to streambuf */
        if (stream->num_crlf == 4) {
-               n = streambuf_feed_fd(stream->fd);              
+               n = streambuf_feed_fd(stream->fd, L);
                if (n == 0) {
                        /* closed */
                        lua_pushboolean(L, FALSE);
@@ -653,7 +692,7 @@
                        n--;
 
                        if (stream->num_crlf == 4) {
-                               header_len = body_ptr - stream->body - 1;
+                               header_len = body_ptr - stream->body;
 
                                //LOG_DEBUG(log_audio_decode, "headers %d 
%*s\n", header_len, header_len, stream->body);
 
@@ -663,9 +702,12 @@
                                lua_pushlstring(L, (char *)stream->body, 
header_len);
                                lua_call(L, 2, 0);
 
-                               free(stream->body);
-                               stream->body = NULL;
-                               stream->body_len = 0;
+                               /* do not free the header here - leave it to 
disconnect -
+                                * so that it can be used by the proxy code
+                                */
+
+                               /* Send headers to proxy clients */
+                               proxy_chunk(stream->body, header_len, L);
 
                                break;
                        }
@@ -676,7 +718,7 @@
        streambuf_lptr = streambuf_fifo.wptr;
 
        /* feed remaining buffer */
-       streambuf_feed(buf_ptr, n);
+       streambuf_feedL(buf_ptr, n, L);
 
        lua_pushboolean(L, TRUE);
        return 1;
@@ -723,6 +765,43 @@
        */
 
        lua_pushboolean(L, TRUE);
+       return 1;
+}
+
+
+static int stream_proxyWriteL(lua_State *L) {
+       struct stream *stream;
+       struct chunk *chunk;
+       ssize_t n;
+       size_t len, offset;
+
+       /*
+        * 1: Stream (self)
+        * 2: Proxy stream
+        * 3: chunk
+        * 4: offset
+        */
+
+       stream = lua_touserdata(L, 2);
+       chunk = lua_touserdata(L, 3);
+       offset = lua_tointeger(L, 4);
+
+       len = chunk->len - offset;
+       n = send(stream->fd, chunk->buf + offset, len, MSG_NOSIGNAL);
+       if (n < 0) {
+               if (errno != EAGAIN) {
+                       lua_pushnil(L);
+                       lua_pushstring(L, strerror(SOCKETERROR));
+                       return 2;
+               }
+       } else if ((size_t)n < len) {
+               offset += n;
+       } else {
+               /* wrote it all */
+               lua_pushnil(L);
+               return 1;
+       }
+       lua_pushinteger(L, offset);
        return 1;
 }
 
@@ -865,6 +944,7 @@
        { "loadLoop", stream_load_loopL },
        { "markLoop", stream_mark_loopL },
        { "icyMetaInterval", stream_icy_metaintervalL },
+       { "proxyWrite", stream_proxyWriteL },
        { NULL, NULL }
 };
 

Modified: 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h
URL: 
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h?rev=9557&r1=9556&r2=9557&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h (original)
+++ 7.7/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h Mon Nov 28 
10:50:48 2011
@@ -44,7 +44,7 @@
 
 extern size_t streambuf_read(u8_t *buf, size_t min, size_t max, bool_t 
*streaming);
 
-extern ssize_t streambuf_feed_fd(int fd);
+extern ssize_t streambuf_feed_fd(int fd, lua_State *L);
 
 extern bool_t streambuf_is_copyright();
 

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/mailman/listinfo/jive-checkins

Reply via email to