Author: adrian
Date: Sat Mar  5 03:04:13 2011
New Revision: 9339

URL: http://svn.slimdevices.com/jive?rev=9339&view=rev
Log:
Bug: N/A
Description: Move Rtmp protocol implementation to C for all of the packet 
processing.  Previous lua only implementation created lots of lua strings which 
proved a performance issue for high bitrate streams.

Added:
    7.6/trunk/squeezeplay/src/luartmp-squeezeplay/
    7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile
    7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
Modified:
    7.6/trunk/squeezeplay/src/Makefile.linux
    7.6/trunk/squeezeplay/src/Makefile.squeezeos
    7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
    7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua
    7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h

Modified: 7.6/trunk/squeezeplay/src/Makefile.linux
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/Makefile.linux?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/Makefile.linux (original)
+++ 7.6/trunk/squeezeplay/src/Makefile.linux Sat Mar  5 03:04:13 2011
@@ -158,8 +158,8 @@
 # squeezeplay
 #
 
-.PHONY: app portaudio flac libmad tremor squeezeplay squeezeplay_desktop 
squeezeplay_contrib squeezeplay_private freefont freefont-debian axtls
-app: portaudio flac libmad tremor ${SPPRIVATE_TARGETS} squeezeplay 
squeezeplay_desktop squeezeplay_contrib freefont squeezeplay-tgz
+.PHONY: app portaudio flac libmad tremor rtmp squeezeplay squeezeplay_desktop 
squeezeplay_contrib squeezeplay_private freefont freefont-debian axtls
+app: portaudio flac libmad tremor rtmp ${SPPRIVATE_TARGETS} squeezeplay 
squeezeplay_desktop squeezeplay_contrib freefont squeezeplay-tgz
 
 # portaudio
 portaudio_v19_1360/Makefile:
@@ -207,6 +207,9 @@
 axtls: ${AXTLS_DIR}/Makefile
        cd ${AXTLS_DIR}; make oldconfig && make && make install
 
+rtmp: 
+       cd luartmp-squeezeplay; make
+       cp luartmp-squeezeplay/rtmp.so ${PREFIX}/lib/lua/5.1/rtmp.so 
 
 
 # squeezeplay

Modified: 7.6/trunk/squeezeplay/src/Makefile.squeezeos
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/Makefile.squeezeos?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/Makefile.squeezeos (original)
+++ 7.6/trunk/squeezeplay/src/Makefile.squeezeos Sat Mar  5 03:04:13 2011
@@ -138,8 +138,8 @@
 #
 # squeezeplay
 #
-.PHONY: squeezeplay-all portaudio flac libmad tremor squeezeplay 
squeezeplay_jive freefont 
-squeezeplay-all: portaudio flac libmad tremor squeezeplay squeezeplay_jive 
freefont
+.PHONY: squeezeplay-all portaudio flac libmad tremor rtmp squeezeplay 
squeezeplay_jive freefont 
+squeezeplay-all: portaudio flac libmad tremor rtmp squeezeplay 
squeezeplay_jive freefont
 
 
 # portaudio
@@ -169,6 +169,10 @@
 
 tremor: Tremor/Makefile
        cd Tremor; make && make install
+
+rtmp: 
+       cd luartmp-squeezeplay; make
+       cp luartmp-squeezeplay/rtmp.so ${PREFIX}/lib/lua/5.1/rtmp.so 
 
 
 # squeezeplay

Added: 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile?rev=9339&view=auto
==============================================================================
--- 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile (added)
+++ 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile Sat Mar  5 03:04:13 
2011
@@ -1,0 +1,7 @@
+# Makefile for C component of Rtmp.lua specifically for squeezeplay
+# This module is included here so that it can be built as a shared library 
rather than statically linked into squeezeplay
+
+MYCFLAGS = -I../squeezeplay/src -I../squeezeplay/src/ui
+
+rtmp.so: rtmp.c
+       $(CC) $(CFLAGS) $(MYCFLAGS) -shared rtmp.c -o rtmp.so

Added: 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c?rev=9339&view=auto
==============================================================================
--- 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c (added)
+++ 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c Sat Mar  5 03:04:13 
2011
@@ -1,0 +1,803 @@
+/* 
+   This module provides the C side of the rtmp implementation - see also 
jive.audio.Rtmp.lua
+
+   The module implements a subset of the Adobe RTMP protocol as specified by:
+    http://www.adobe.com/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
+
+   (c) Adrian Smith (Triode), 2009, 2010, 2011, [email protected]
+
+   The protocol state machine and all packet processing is now implemented in 
C to improve performance and resource demands.
+   The remaining lua code is used to create serialised rtmp request packets 
which are passed to the C protocol implementation.
+
+   The implementation makes the following assumptions:
+
+   1) streams use a streamingId of 1 (it ignores the streamingId inside the 
amf0 _result reponse to a createStream message)
+   2) only implements single byte chunk headers (chunk id < 63)
+   3) timestamps are not send in any packets sent to the server (they are 
always set to 0)
+   
+*/
+
+#include "common.h"
+#include "jive.h"
+#include "audio/streambuf.h"
+
+#if defined(WIN32)
+
+#include <winsock2.h>
+
+typedef SOCKET socket_t;
+#define CLOSESOCKET(s) closesocket(s)
+#define SOCKETERROR WSAGetLastError()
+
+#else
+
+typedef int socket_t;
+#define CLOSESOCKET(s) close(s)
+#define SOCKETERROR errno
+
+#endif
+
+extern LOG_CATEGORY *log_audio_decode;
+
+struct stream {
+       socket_t fd;
+       // other stuff we don't use
+};
+
+#define min(x, y) ((x) < (y) ? (x) : (y))
+
+#define INCACHE_SLOTS      8 // slots for caching state for concurrent active 
rtmp chunk channels - set to be higher than seen, which is 5
+#define BUFFER_UNTIL_TS 4500 // on stream start only transition to playing 
state after this timestamp, avoids startup rebuffer
+
+typedef enum {
+       RTMP_IDLE = 0, RTMP_AWAIT_S0 = 1, RTMP_AWAIT_S1 = 2, RTMP_AWAIT_S2 = 3, 
+       RTMP_SENT_CONNECT = 4, RTMP_SENT_CREATE_STREAM = 5, 
RTMP_SENT_FC_SUBSCRIBE = 6, RTMP_SENT_PLAY = 7,
+       RTMP_BUFFERING = 8, RTMP_PLAYING = 9
+} rtmp_state;
+
+static const char *rtmp_state_name[] = {
+       "idle", "awaitS0", "awaitS1", "awaitS2", "sentConnect", 
"sentCreateStream", "sentFCSubscribe", "sentPlay",
+       "Buffering", "Playing"
+};
+
+static rtmp_state state = RTMP_IDLE;
+static u8_t *hs_token = NULL;
+static unsigned recv_chunksize;
+static unsigned recv_bytes;
+static unsigned next_ack;
+static unsigned ack_wind;
+
+struct {
+       u8_t buf[4096*16];
+       u8_t *pos;
+       unsigned len;
+} inbuf;
+
+struct incache_entry {
+       unsigned chan;
+       u8_t type;
+       u8_t *buf;
+       unsigned len;
+       unsigned rem;
+       unsigned ts;
+       unsigned dts;
+};
+
+static struct incache_entry incache[INCACHE_SLOTS];
+
+void change_state(rtmp_state new) {
+       int i;
+       LOG_INFO(log_audio_decode, "rtmp state: %s -> %s", 
rtmp_state_name[state], rtmp_state_name[new]);
+       state = new;
+       // if moving to idle reinit state
+       if (state == RTMP_IDLE) {
+               inbuf.pos = inbuf.buf;
+               inbuf.len = 0;
+               recv_chunksize = 128;
+               recv_bytes = 0;
+               next_ack   = 20480;
+               ack_wind   = 20480;
+               for (i = 0; i < INCACHE_SLOTS; i++) {
+                       if (incache[i].buf) {
+                               free(incache[i].buf);
+                       }
+               }
+               memset(incache, 0, sizeof(struct incache_entry) * 
INCACHE_SLOTS);
+       }
+}
+
+// busywaiting send used to send outbound packets
+// it is assumed these are normally smaller than sndbuf so this does not stall
+void _send(socket_t s, u8_t *buf, size_t len) {
+       int n, stalled = 0;
+       while (len > 0) {
+               n = send(s, buf, len, 0);
+               if (n >= 0) {
+                       len -= n;
+                       buf += n;
+               } else if (SOCKETERROR == EAGAIN || SOCKETERROR == EWOULDBLOCK) 
{
+                       ++stalled;
+                       if (stalled % 10 == 9) {
+                               LOG_ERROR(log_audio_decode, "stalled writing, 
count: %d", stalled);
+                       }
+               } else {
+                       LOG_ERROR(log_audio_decode, "problem writing, error: 
%s", strerror(SOCKETERROR));
+                       break;
+               }
+       }
+}
+
+// send rtmp packets fragmenting if necessary
+// assume all packets have a t0 header (no header compression)
+void send_rtmp(socket_t s, u8_t *buf, size_t len) {
+       u8_t header0 = *buf;
+       
+       if (len >= 12) {
+
+               // first 12 bytes are the t0 header
+               _send(s, buf, 12);
+               buf += 12;
+               len -= 12;
+
+               while (len > 0) {
+                       // fragment into chunks of 128 bytes
+                       size_t chunklen = min(len, 128);
+                       _send(s, buf, chunklen);
+                       buf += chunklen;
+                       len -= chunklen;
+
+                       // add fragment header if more
+                       if (len > 0) {
+                               u8_t header = header0 | 0xc0;
+                               _send(s, &header, 1);
+                       }
+               }
+
+       } else {
+               LOG_ERROR(log_audio_decode, "packet too short");
+       }
+}
+
+int send_handshakeL(lua_State *L) {
+       struct stream *stream;
+       u8_t *ptr;
+       int i;
+
+       stream = lua_touserdata(L, 1);
+
+       // reset rtmp state
+       change_state(RTMP_IDLE);
+
+       if (!hs_token) {
+               hs_token = malloc(1528);
+       }
+
+       for (i = 0, ptr = hs_token; i < 1528; ++i) {
+               *ptr++ = rand() % 256;
+       }
+
+       // c0
+       _send(stream->fd, (u8_t*)"\x03", 1);
+       // c1 header
+       _send(stream->fd, (u8_t*)"\x00\x00\x00\x00\x00\x00\x00\x00", 8);
+       // c1 token
+       _send(stream->fd, hs_token, 1528);
+
+       change_state(RTMP_AWAIT_S0);
+
+       lua_pushboolean(L, TRUE);
+       return 1;
+}
+
+bool rtmp_packet_exists(lua_State *L, const char *name) {
+       bool exists;
+
+       lua_getglobal(L,    "jive");
+       lua_getfield(L, -1, "audio");
+       lua_getfield(L, -1, "Rtmp");
+       lua_getfield(L, -1, "rtmpMessages");
+       lua_getfield(L, -1, name);
+
+       exists = lua_isstring(L, -1);
+
+       lua_pop(L, 5);
+
+       return exists;
+}
+
+void send_rtmp_packet(lua_State *L, const char *name) {
+       struct stream *stream = lua_touserdata(L, 1);;
+       u8_t *packet;
+       size_t len;
+
+       // get preformatted packets from lua
+       lua_getglobal(L,    "jive");
+       lua_getfield(L, -1, "audio");
+       lua_getfield(L, -1, "Rtmp");
+       lua_getfield(L, -1, "rtmpMessages");
+       lua_getfield(L, -1, name);
+
+       if (lua_isstring(L, -1)) {
+               LOG_INFO(log_audio_decode, "sending %s packet", name);
+               packet = (u8_t *)lua_tolstring(L, -1, &len);
+               send_rtmp(stream->fd, packet, len);
+       } else {
+               LOG_INFO(log_audio_decode, "can't find rtmp packet: %s", name);
+       }
+
+       lua_pop(L, 5);
+}
+
+// rtmp packet handlers - return false for error to force stream close
+
+// receive chunk size handler
+bool messageType1(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       recv_chunksize = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
+       LOG_INFO(log_audio_decode, "message type 1 - set recv chunk size to: 
%d", recv_chunksize);
+       return true;
+}
+
+// abort channel handler
+bool messageType2(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       LOG_INFO(log_audio_decode, "message type 2 - abort for chunk channel: 
%d", entry->chan);
+       if (entry->buf) {
+               free(entry->buf);
+       }
+       memset(entry, 0, sizeof(struct incache_entry));
+       return true;
+}
+
+// ack received handler
+bool messageType3(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       LOG_INFO(log_audio_decode, "message type 3 - ack received");
+       return true;
+}
+
+// user control message handler
+bool messageType4(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       unsigned event = *(buf) << 8 | *(buf+1);
+       u8_t *data = buf + 2;
+       switch (event) {
+       case 0: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 0: Stream Begin"); break;
+       case 1: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 1: EOF - exiting"); 
+               return false;
+               break;
+       case 2: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 2: Stream Dry"); break;
+       case 4: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 4: Stream Is Recorded"); break;
+       case 6: 
+               LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 6: Ping Request - sending response");
+               {
+                       struct stream *stream = lua_touserdata(L, 1);
+                       u8_t *packet_template, packet[18];
+                       
+                       packet_template = (u8_t *)
+                               "\x02"             // chan 2, format 0
+                               "\x00\x00\x00"     // timestamp (null)
+                               "\x00\x00\x06"     // length [data should be 4 
bytes]
+                               "\x04"             // type 0x04
+                               "\x00\x00\x00\x00" // streamId 0
+                               "\x00\x07"         // event type 7
+                               "\x00\x00\x00\x00";// (overwrite with data - 4 
bytes)
+                       
+                       memcpy(packet, packet_template, 18);
+                       memcpy(packet + 14, data, 4);
+                       
+                       send_rtmp(stream->fd, packet, 18);
+               }
+               break;
+
+       default: LOG_DEBUG(log_audio_decode, "message type 4 - user control 
message event %d: ignored", event);
+       }
+       return true;
+}
+
+// window ack size handler
+bool messageType5(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
+       LOG_INFO(log_audio_decode, "message type 5 - window ack size: %d - 
ignored", window);
+       return true;
+}
+
+// set window size handler
+bool messageType6(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       struct stream *stream = lua_touserdata(L, 1);
+       unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
+       unsigned limit  = *(buf+4);
+       u8_t *packet_template, packet[16];
+
+       // send window ack packet
+       LOG_INFO(log_audio_decode, "message type 6 - set peer BW: %d limit 
type: %d", window, limit);
+
+       packet_template = (u8_t *)
+               "\x02"             // chan 2, format 0
+               "\x00\x00\x00"     // timestamp (null)
+               "\x00\x00\x04"     // length
+               "\x05"             // type 0x05
+               "\x00\x00\x00\x00" // streamId 0
+               "\x00\x00\x00\x00";// (overwrite with window)
+
+       memcpy(packet, packet_template, 16);
+       // buf[0-3] is window - copy it
+       memcpy(packet+12, buf, 4);
+       send_rtmp(stream->fd, packet, 16);
+
+       ack_wind = window / 2;
+       return true;
+}
+
+// audio packet handler
+bool messageType8(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       int n = streambuf_get_freebytes();
+       
+       if (*buf == 0xAF) {
+               static u8_t adts[7]; // adts static header, set by aac config
+               
+               if (*(buf + 1) == 0x01) {
+                       // AAC audio
+                       u8_t header[7];
+                       unsigned framesize = entry->len - 2 + 7;
+                       memcpy(header, adts, 7);
+                       header[3] |= ((framesize >> 11) & 0x03);
+                       header[4] |= ((framesize >>  3) & 0xFF);
+                       header[5] |= ((framesize <<  5) & 0xE0);
+                       LOG_DEBUG(log_audio_decode, "aac audio data: %d 
timestamp: %d", framesize, entry->ts);
+                       if (n > framesize) {
+                               streambuf_feed(header, 7);
+                               streambuf_feed(buf + 2, entry->len - 2);
+                       } else {
+                               LOG_ERROR(log_audio_decode, "panic - not enough 
space in streambuf - case not handled by implementation");
+                               return false;
+                       }
+                       
+               } else if (*(buf + 1) == 0x00) {
+                       // AAC config
+                       unsigned profile  = 1; // hard coded, ignore config
+                       unsigned sr_index = ((*(buf+2) << 8 | *(buf+3)) & 
0x0780) >> 7;
+                       unsigned channels = (*(buf+3) & 0x78) >> 3;
+                       LOG_INFO(log_audio_decode, "aac config: profile: %d 
sr_index: %d channels: %d", profile, sr_index, channels);
+                       adts[0] = 0xFF;
+                       adts[1] = 0xF9;
+                       adts[2] = ((profile << 6) & 0xC0) | ((sr_index << 2) & 
0x3C) | ((channels >> 2) & 0x1);
+                       adts[3] = ((channels << 6) & 0xC0);
+                       adts[4] = 0x00;
+                       adts[5] = ((0x7FF >> 6) & 0x1F);
+                       adts[6] = ((0x7FF << 2) & 0xFC);
+               }
+               
+       } else if ((*buf & 0xF0) == 0x20) {
+               // MP3 audio
+               LOG_DEBUG(log_audio_decode, "mp3 audio data: %d timestamp: %d", 
entry->len - 1, entry->ts);
+               if (n >= entry->len - 1) {
+                       streambuf_feed(buf + 1, entry->len - 1);
+               } else {
+                       LOG_ERROR(log_audio_decode, "panic - not enough space 
in streambuf - case not handled by implementation");
+                       return false;
+               }
+       }
+       
+       if (state < RTMP_PLAYING) {
+
+               bool send_start = false;
+
+               if (state < RTMP_BUFFERING) {
+                       if (!rtmp_packet_exists(L, "live")) {
+                               send_start = true;
+                       } else {
+                               change_state(RTMP_BUFFERING);
+                       }
+               }
+
+               if (state == RTMP_BUFFERING && entry->ts > BUFFER_UNTIL_TS) {
+                       send_start = true;
+               }
+
+               if (send_start) {
+                       // send streamStartEvent to start playback
+                       lua_getglobal(L,    "jive");
+                       lua_getfield(L, -1, "audio");
+                       lua_getfield(L, -1, "Rtmp");
+                       lua_getfield(L, -1, "streamStartEvent");
+                       if (lua_pcall(L, 0, 0, 0) != 0) {
+                               fprintf(stderr, "error running 
streamStartEvent: %s\n", lua_tostring(L, -1));
+                       }
+                       change_state(RTMP_PLAYING);
+               }
+       }
+
+       return true;
+}
+
+// metadata handler
+bool messageType18(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       LOG_INFO(log_audio_decode, "message type 18 - meta");
+
+       // send to server for debug
+       lua_getglobal(L,    "jive");
+       lua_getfield(L, -1, "audio");
+       lua_getfield(L, -1, "Rtmp");
+       lua_getfield(L, -1, "sendMeta");
+       lua_pushlstring(L, (const char *)buf, entry->len);
+       if (lua_pcall(L, 1, 0, 0) != 0) {
+               fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L, 
-1));
+       }
+       return true;
+}
+
+// helper for messageType20 which returns true if string exists anywhere 
within buf
+bool bufmatch(u8_t *buf, size_t len, const char *string) {
+       unsigned i, string_len, match = 0;
+       string_len = strlen(string);
+       for (i = 0; i < len; i++) {
+               if (*buf++ == string[match]) {
+                       match++;
+               } else {
+                       match = 0;
+               }
+               if (match == string_len) {
+                       return true;
+               }
+       }
+       return false;
+}
+
+// message type 20
+bool messageType20(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+       LOG_INFO(log_audio_decode, "message type 20");
+
+       // send packet to server for debug
+       lua_getglobal(L,    "jive");
+       lua_getfield(L, -1, "audio");
+       lua_getfield(L, -1, "Rtmp");
+       lua_getfield(L, -1, "sendMeta");
+       lua_pushlstring(L, (const char *)buf, entry->len);
+       if (lua_pcall(L, 1, 0, 0) != 0) {
+               fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L, 
-1));
+       }
+
+       if (bufmatch(buf, entry->len, "_result")) {
+
+               if (state == RTMP_SENT_CONNECT) {
+
+                       LOG_INFO(log_audio_decode, "sending createStream");
+                       send_rtmp_packet(L, "create");
+                       change_state(RTMP_SENT_CREATE_STREAM);
+
+               } else if (state == RTMP_SENT_CREATE_STREAM) {
+
+                       if (rtmp_packet_exists(L, "subscribe")) {
+
+                               LOG_INFO(log_audio_decode, "sending 
FCSubscribe");
+                               send_rtmp_packet(L, "subscribe");
+                               change_state(RTMP_SENT_FC_SUBSCRIBE);
+
+                       } else {
+
+                               LOG_INFO(log_audio_decode, "sending play");
+                               send_rtmp_packet(L, "play");
+                               change_state(RTMP_SENT_PLAY);
+
+                       }
+               }
+
+       } else if (bufmatch(buf, entry->len, "_error")) {
+
+               LOG_WARN(log_audio_decode, "stream error");
+               return false;
+               
+       } else if (bufmatch(buf, entry->len, "onFCSubscribe")) {
+               
+               LOG_INFO(log_audio_decode, "sending play");
+               send_rtmp_packet(L, "play");
+               change_state(RTMP_SENT_PLAY);
+               
+       } else if (bufmatch(buf, entry->len, "onStatus")) {
+               
+               LOG_INFO(log_audio_decode, "onStatus");
+               
+               if (bufmatch(buf, entry->len, "NetStream.Failed") ||
+                       bufmatch(buf, entry->len, "NetStream.Play.Failed") ||
+                       bufmatch(buf, entry->len, 
"NetStream.Play.StreamNotFound") ||
+                       bufmatch(buf, entry->len, 
"NetConnection.Connect.InvalidApp") ||
+                       bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
+                       bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+                       
+                       LOG_WARN(log_audio_decode, "error status received - 
closing stream");
+                       
+                       return false;
+               }       
+       }
+
+       return true;
+}
+
+int readL(lua_State *L) {
+       struct stream *stream;
+       bool readmore = true;
+       /*
+        * 1: Stream (self)
+        * 2: Playback (self)
+        */
+
+       stream = lua_touserdata(L, 1);
+
+       // shuffle existing data in inbuf to start
+       if (inbuf.len) {
+               memcpy(inbuf.buf, inbuf.pos, inbuf.len);
+               inbuf.pos = inbuf.buf;
+       }
+
+       while (readmore) {
+
+               size_t len;
+               readmore = false;
+
+               // shuffle to the start if only using second half of buffer 
(i.e. don't do each loop)
+               if (inbuf.len && inbuf.pos - inbuf.buf > sizeof(inbuf.buf) / 2) 
{
+                       memcpy(inbuf.buf, inbuf.pos, inbuf.len);
+                       inbuf.pos = inbuf.buf;
+               }
+
+               len = recv(stream->fd, inbuf.pos + inbuf.len, sizeof(inbuf.buf) 
- (inbuf.pos - inbuf.buf + inbuf.len), 0);
+               
+               if (len == -1) {
+                       if (SOCKETERROR == EAGAIN) {
+                               if (inbuf.len == 0) {
+                                       lua_pushinteger(L, 0);
+                                       return 1;
+                               }
+                       } else {
+                               LOG_ERROR(log_audio_decode, "socket closed, 
%s", strerror(SOCKETERROR));
+                               CLOSESOCKET(stream->fd);
+                               lua_pushnil(L);
+                               lua_pushstring(L, strerror(SOCKETERROR));
+                               return 2;
+                       }
+               } else {
+                       inbuf.len  += len;
+                       recv_bytes += len;
+               }
+
+               // handshake phase
+               if (state < RTMP_SENT_CONNECT) {
+                       
+                       if (state == RTMP_AWAIT_S0 && inbuf.len >= 1 && 
*inbuf.pos == 0x03) {
+                               inbuf.pos += 1;
+                               inbuf.len -= 1;
+                               change_state(RTMP_AWAIT_S1);
+                       }
+                       
+                       if (state == RTMP_AWAIT_S1 && inbuf.len >= 1536) {
+                               _send(stream->fd, inbuf.pos, 4);
+                               _send(stream->fd, (unsigned char *) 
"\x00\x00\x00\x00", 4);
+                               _send(stream->fd, inbuf.pos + 8, 1528);
+                               inbuf.pos += 1536;
+                               inbuf.len -= 1536;
+                               change_state(RTMP_AWAIT_S2);                    
        
+                       }
+                       
+                       if (state == RTMP_AWAIT_S2 && inbuf.len >= 1536) {
+                               if (hs_token && !strncmp((char *)(inbuf.pos + 
8), (char *) hs_token, 1528)) {
+                                       free(hs_token);
+                                       hs_token = NULL;
+                                       inbuf.pos += 1536;
+                                       inbuf.len -= 1536;
+                                       send_rtmp_packet(L, "connect");
+                                       change_state(RTMP_SENT_CONNECT);
+                               } else {
+                                       LOG_ERROR(log_audio_decode, "bad 
handshake token");
+                                       CLOSESOCKET(stream->fd);
+                                       lua_pushnil(L);
+                                       lua_pushstring(L, "bad handshake 
token");
+                                       return 2;
+                               }
+                       }
+               }
+               
+               // connected phase
+               if (state >= RTMP_SENT_CONNECT && inbuf.len > 0) {
+                       
+                       unsigned chan = *inbuf.pos & 0x3f;
+                       unsigned fmt  = (*inbuf.pos & 0xc0) >> 6;
+                       u8_t *dpos = NULL;
+                       bool reasembled = false;
+                       struct incache_entry *entry;
+                       int i;
+                       char *error = NULL;
+
+                       // find or create a cache entry for this chan in the 
incache
+                       for (i = 0; i < INCACHE_SLOTS; i++) {
+                               entry = &incache[i];
+                               if (chan == entry->chan) {
+                                       break;
+                               } else if (!entry->chan) {
+                                       entry->chan = chan;
+                                       break;
+                               }
+                       }
+
+                       if (i == INCACHE_SLOTS) {
+                               error = "run out of incache slots";
+                       } else if (chan == 0 || chan == 1) {
+                               error = "rtmp chan > 63 - not supported";
+                       }
+
+                       if (error) {
+                               LOG_ERROR(log_audio_decode, "%s", error);
+                               CLOSESOCKET(stream->fd);
+                               lua_pushnil(L);
+                               lua_pushstring(L, error);
+                               return 2;
+                       }
+
+                       if (fmt == 0 && inbuf.len >= 12) {
+
+                               int t0len = (*(inbuf.pos+4) << 16) | 
(*(inbuf.pos+5) << 8) | *(inbuf.pos+6);
+                               int read  = min(t0len, recv_chunksize) + 12;
+                               
+                               if (inbuf.len >= read) {
+                                       entry->type = *(inbuf.pos + 7);
+                                       entry->len  = t0len;
+                                       entry->ts   = (*(inbuf.pos+1) << 16) | 
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+                                       if (t0len == read - 12) {
+                                               dpos = inbuf.pos + 12;
+                                       } else {
+                                               if (entry->buf) 
free(entry->buf);
+                                               entry->buf  = malloc(t0len);
+                                               memcpy(entry->buf, inbuf.pos + 
12, read - 12);
+                                               entry->rem  = t0len + 12 - read;
+                                       }
+                                       inbuf.pos += read;
+                                       inbuf.len -= read;
+                                       readmore = true;
+                               }
+                               
+                       } else if (fmt == 1 && inbuf.len >= 8) {
+
+                               int t1len = (*(inbuf.pos+4) << 16) | 
(*(inbuf.pos+5) << 8) | *(inbuf.pos+6);
+                               int read  = min(t1len, recv_chunksize) + 8;
+                               
+                               if (inbuf.len >= read) {
+                                       entry->type = *(inbuf.pos + 7);
+                                       entry->len  = t1len;
+                                       entry->dts  = (*(inbuf.pos+1) << 16) | 
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+                                       entry->ts   += entry->dts;
+                                       if (t1len == read - 8) {
+                                               dpos  = inbuf.pos + 8;
+                                       } else {
+                                               if (entry->buf) 
free(entry->buf);
+                                               entry->buf  = malloc(t1len);
+                                               memcpy(entry->buf, inbuf.pos + 
8, read - 8);
+                                               entry->rem  = t1len + 8 - read;
+                                       }
+                                       inbuf.pos += read;
+                                       inbuf.len -= read;
+                                       readmore = true;
+                               }
+                               
+                       } else if (fmt == 2 && entry->type) {
+                               
+                               int t2len = entry->len;
+                               int read  = min(t2len, recv_chunksize) + 4;
+                               
+                               if (inbuf.len >= read) {
+                                       entry->dts  = (*(inbuf.pos+1) << 16) | 
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+                                       entry->ts   += entry->dts;
+                                       if (t2len == read - 4) {
+                                               dpos  = inbuf.pos + 4;
+                                       } else {
+                                               if (entry->buf) 
free(entry->buf);
+                                               entry->buf  = malloc(t2len);
+                                               memcpy(entry->buf, inbuf.pos + 
4, read - 4);
+                                               entry->rem  = t2len + 4 - read;
+                                       }
+                                       inbuf.pos += read;
+                                       inbuf.len -= read;
+                                       readmore = true;
+                               }
+                               
+                       } else if (fmt == 3 && entry->rem) {
+                               
+                               int read  = min(entry->rem, recv_chunksize) + 1;
+                               
+                               if (inbuf.len >= read) {
+                                       // add to existing fragment
+                                       memcpy(entry->buf + entry->len - 
entry->rem, inbuf.pos + 1, read - 1);
+                                       entry->rem -= (read - 1);
+                                       if (!entry->rem) {
+                                               reasembled = true;
+                                       }
+                                       inbuf.pos += read;
+                                       inbuf.len -= read;
+
+                                       readmore = true;
+                               }
+                               
+                       } else if (fmt == 3 && entry->type) {
+                               
+                               int t3len = entry->len;
+                               int read  = min(t3len, recv_chunksize) + 1;
+                               
+                               if (inbuf.len >= read) {
+                                       entry->ts += entry->dts;
+                                       if (t3len == read - 1) {
+                                               dpos = inbuf.pos + 1;
+                                       } else {
+                                               if (entry->buf) 
free(entry->buf);
+                                               entry->buf  = malloc(t3len);
+                                               memcpy(entry->buf, inbuf.pos + 
1, read - 1);
+                                               entry->rem  = t3len + 1 - read;
+                                       }
+                                       inbuf.pos += read;
+                                       inbuf.len -= read;
+
+                                       readmore = true;
+                               }
+                       }
+                       
+                       if (dpos || reasembled) {
+                               u8_t *buf = dpos ? dpos : entry->buf;
+                               bool ok = true;
+
+                               switch(entry->type) {
+                               case  1: ok = messageType1(L, buf, entry); 
break;
+                               case  2: ok = messageType2(L, buf, entry); 
break;
+                               case  3: ok = messageType3(L, buf, entry); 
break;
+                               case  4: ok = messageType4(L, buf, entry); 
break;
+                               case  5: ok = messageType5(L, buf, entry); 
break;
+                               case  6: ok = messageType6(L, buf, entry); 
break;
+                               case  8: ok = messageType8(L, buf, entry); 
break;
+                               case 18: ok = messageType18(L, buf, entry); 
break;
+                               case 20: ok = messageType20(L, buf, entry); 
break;
+                               default:
+                                       LOG_DEBUG(log_audio_decode, "unhandled 
rtmp packet type: %d", entry->type);
+                               }
+
+                               if (!ok) {
+                                       LOG_ERROR(log_audio_decode, "handler 
returned false - closing stream");
+                                       CLOSESOCKET(stream->fd);
+                                       lua_pushnil(L);
+                                       lua_pushstring(L, "handler returned 
false - closing stream");
+                                       return 2;
+                               }
+                       }
+               }
+
+               if (recv_bytes > next_ack) {
+                       u8_t *packet_template, packet[16];
+
+                       LOG_DEBUG(log_audio_decode, "sending ack: %ud", 
recv_bytes);
+
+                       // send ack packet
+                       packet_template = (u8_t *)
+                               "\x02"             // chan 2, format 0
+                               "\x00\x00\x00"     // timestamp (null)
+                               "\x00\x00\x04"     // length
+                               "\x03"             // type 0x03
+                               "\x00\x00\x00\x00" // streamId 0
+                               "\x00\x00\x00\x00";// (overwrite with 
recv_bytes)
+
+                       memcpy(packet, packet_template, 16);
+                       *(packet+12) = (recv_bytes & 0xFF000000) >> 24;
+                       *(packet+13) = (recv_bytes & 0x00FF0000) >> 16;
+                       *(packet+14) = (recv_bytes & 0x0000FF00) >>  8;
+                       *(packet+15) = (recv_bytes & 0x000000FF);
+
+                       send_rtmp(stream->fd, packet, 16);
+                       next_ack += ack_wind;
+               }
+       }
+
+       lua_pushinteger(L, 1);
+       return 1;
+}
+
+static const struct luaL_Reg rtmp_f [] = {
+       { "sendHandshake", send_handshakeL },
+       { "read", readL },
+       { NULL, NULL }
+};
+
+int luaopen_rtmp (lua_State *L) {
+       luaL_register(L, "rtmp", rtmp_f);
+       return 1;
+}

Modified: 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua 
(original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua Sat Mar 
 5 03:04:13 2011
@@ -1,5 +1,5 @@
 
-local assert, tostring, type, pairs, ipairs, getmetatable = assert, tostring, 
type, pairs, ipairs, getmetatable
+local assert, tostring, type, pairs, ipairs, getmetatable, require = assert, 
tostring, type, pairs, ipairs, getmetatable, require
 
 
 local oo                     = require("loop.base")
@@ -10,7 +10,6 @@
 local hasDecode, decode      = pcall(require, "squeezeplay.decode")
 local hasSprivate, spprivate = pcall(require, "spprivate")
 local Stream                 = require("squeezeplay.stream")
-local Rtmp                   = require("jive.audio.Rtmp")
 local SlimProto              = require("jive.net.SlimProto")
 local Player                 = require("jive.slim.Player")
 
@@ -118,7 +117,8 @@
                slimproto:capability("Spdirect", cap)
        end
 
-       Rtmp:init(slimproto)
+       -- signal we are Rtmp capable, but don't load module until used
+       slimproto:capability("Rtmp", 2)
 
        self.mode = 0
        self.threshold = 0
@@ -466,7 +466,8 @@
                m.read  = reader
                m.write = writer
        elseif self.flags & 0x20 == 0x20 then
-               -- use Rtmp methods
+               -- use Rtmp methods, load Rtmp module on demand
+               Rtmp = require("jive.audio.Rtmp")
                m.read  = Rtmp.read
                m.write = Rtmp.write
        else

Modified: 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua (original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua Sat Mar  5 
03:04:13 2011
@@ -1,30 +1,32 @@
 -- This module implements a subset of the Adobe RTMP protocol as specified by:
 --  http://www.adobe.com/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
 --
--- It provides direct streaming for rtmp in conjunction with the RTMP.pm 
server protocol handler
--- which is contained with the BBCiPlayer plugin.  It is intended for use with 
BBC live and listen
--- again streams as supported by the BBCiPlayer plugin.
---
--- (c) Triode, 2009, [email protected]
+-- It provides direct streaming for rtmp and is primarily designed to be used 
with the BBCRadio applet or BBCiPlayer plugin
+--
+-- (c) Adrian Smith (Triode), 2009, 2010, 2011, [email protected]
 --
 -- The implementation (api v2) here contains both the low level state machine 
for processing the rtmp protocol and
 -- serialisation code for generating amf0 request objects.  Parsing of amf0 
responses is not implemented here and requires
 -- server support.
 --
--- It makes the following assumptions:
+-- The protocol state machine and all packet processing is now implemented in 
C to improve performance and resource demands.
+-- The remaining lua code is used to create serialised rtmp request packets 
which are passed to the C protocol implementation.
+--
+-- The implementation makes the following assumptions:
 --
 -- 1) streams use a streamingId of 1 (it ignores the streamingId inside the 
amf0 _result reponse to a createStream message)
 -- 2) only implements single byte chunk headers (chunk id < 63)
--- 3) does not implement timestamps - a value of 0 is sent in any timestamp 
field sent to the rtmp server
+-- 3) timestamps are not send in any packets sent to the server (they are 
always set to 0)
 --
 -- Due to the way the stream object is created it is necessary to switch 
methods in the stream object's meta table
 -- so that the Rtmp read and write methods here are used (this is done in 
Playback.lua)
+--
 
 local string, table, math, pairs, type = string, table, math, pairs, type
 
 local Stream   = require("squeezeplay.stream")
-
 local mime     = require("mime")
+local rtmpC    = require("rtmp")
 
 local debug    = require("jive.utils.debug")
 local log      = require("jive.utils.log").logger("audio.decode")
@@ -33,87 +35,18 @@
 
 local FLASH_VER  = "LNX 10,0,22,87"
 
+-- C read method
+read = rtmpC.read
 
 -- session params (can't be stored in the object as we reuse the streambuf 
object)
-local inBuf, outBuf, state, token, inCaches, rtmpMessages
-local ackWindow, nextAck, receivedBytes
-local sendChunkSize, recvChunkSize
-local adts1, adts2, adts3, adts4, adts5, adts6, adts7
-
+rtmpMessages = {} -- not local so it can be accessed from C
 local slimproto
-
-
-function init(self, slimprotoObj)
-       slimproto = slimprotoObj
-       -- api version 2 includes abilty to format and serialise amf objects
-       slimproto:capability("Rtmp", 2)
-end
-
-
-local function unpackNumber(str, pos, len, le)
-       local v = 0
-       if le then
-               for i = pos + len - 1, pos, - 1 do
-                       v = (v << 8) | (string.byte(str, i) or 0)
-               end
-       else
-               for i = pos, pos + len - 1 do
-                       v = (v << 8) | (string.byte(str, i) or 0)
-               end
-       end
-       return v
-end
-
-
-local function packNumber(v, len, le)
-       local t = {}
-       for i = 1, len do
-               t[#t + 1] = string.char(v & 0xFF)
-               v = v >> 8
-       end
-       local str = table.concat(t)
-       return le and str or string.reverse(str)
-end
-
-
-local function changeState(newstate)
-       log:info(state, " -> ", newstate)
-       state = newstate
-end
-
-
-function sendRtmp(stream, rtmp)
-       -- send rtmp packets fragmenting if neccessary
-       -- assume all packets have a t0 header (no header compression)
-       local header = string.sub(rtmp, 1, 12)
-       local body   = string.sub(rtmp, 13)
-
-       stream:_streamWrite(nil, header)
-       
-       len = string.len(body)
-
-       while len > 0 do
-               if len > sendChunkSize then
-                       len = sendChunkSize
-               end
-               local chunk = string.sub(body, 1, len)
-               body = string.sub(body, len + 1)
-
-               stream:_streamWrite(nil, chunk)
-               len = string.len(body)
-
-               if len > 0 then
-                       stream:_streamWrite(nil, string.char( 
string.byte(header, 1) | 0xc0 ) )
-               end
-       end
-end
 
 
 function write(stream, playback, header)
        -- initialise
-       inBuf, outBuf, token, inCaches, rtmpMessages, sendChunkSize, 
recvChunkSize = "", "", "", {}, {}, 128, 128
-       ackWindow, nextAck, receivedBytes = 10240, 10240, 0
-       state = "reset"
+       rtmpMessages = {}
+       slimproto = playback.slimproto
 
        -- extract the pre built rtmp packets or params within the header
        for k, v in string.gmatch(header, "(%w+)=([a-zA-Z0-9%/%+]+%=*)&") do
@@ -130,517 +63,42 @@
                end
        end
 
-       -- create the handshake token
-       for i = 1, 1528 do
-               token = token .. string.char(math.random(0,255))
-       end
-
-       -- send RTMP handshake
-       -- c0
-       local c0 = string.char(0x03)
-
-       -- c1 [ assume timestamp of 0 ]
-       local c1 = string.char(0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00) 
.. token
-
-       changeState("hsAwaitS0")
-
-       return stream:_streamWrite(nil, c0 .. c1)
-end
-
-
-local hsHanders = {
-
-       hsAwaitS0 = { 1,
-                                 function(stream, s0)
-                                         if string.byte(s0, 1) == 0x03 then
-                                                 return 'hsAwaitS1'
-                                         else
-                                                 log:warn("did not get S0 
response")
-                                                 return nil
-                                         end
-                                 end
-       },
-
-       hsAwaitS1 = { 1536,
-                                 function(stream, s1)
-                                         local time1 = string.sub(s1, 1, 4)
-                                         local mytime = string.char(0x00, 
0x00, 0x00, 0x00)
-                                         local c2 = time1 .. mytime .. 
string.sub(s1, 9)
-                                         stream:_streamWrite(nil, c2)
-                                         return 'hsAwaitS2'
-                                 end
-       },
-
-       hsAwaitS2 = { 1536,
-                                 function(stream, s2)
-                                         local rand = string.sub(s2, 9)
-                                         if rand == token then
-                                                 sendRtmp(stream, 
rtmpMessages["connect"])
-                                                 return 'sentConnect'
-                                         else
-                                                 return nil
-                                         end
-                                 end
-       },
-}
-
-local rtmpHandlers = {
-
-       [1]  = function(stream, rtmp)
-                          recvChunkSize = unpackNumber(rtmp["body"], 1, 4)
-                          log:info("message type 1 - set recv chunk size to ", 
recvChunkSize)
-                  end,
-
-       [2]  = function(stream, rtmp)
-                          log:info("message type 2 - abort for chunk channel 
", rtmp["chan"])
-                          inCaches[ rtmp["chan"] ] = {}
-                  end,
-
-       [3]  = function(stream, rtmp)
-                          log:info("ack received")
-                  end,
-
-       [4]  = function(stream, rtmp)
-                          local event = unpackNumber(rtmp["body"], 1, 2)
-                          local data  = string.sub(rtmp["body"], 3)
-
-                          if event == 0 then
-                                  log:info("message type 4 - user control 
message ", event, ": Stream Begin")
-                          elseif event == 1 then
-                                  log:info("message type 4 - user control 
message ", event, ": EOF - exiting")
-                                  return false, true
-                          elseif event == 2 then
-                                  log:info("message type 4 - user control 
message ", event, ": Stream Dry")
-                          elseif event == 4 then
-                                  log:info("message type 4 - user control 
message ", event, ": Stream Is Recorded")
-                          elseif event == 6 then
-                                  log:info("message type 4 - user control 
message ", event, ": Ping Request - sending response")
-                                  sendRtmp(stream, 
-                                                       string.char(0x02,       
           -- chan 2, format 0
-                                                                               
0x00, 0x00, 0x00,      -- timestamp (not implemented)
-                                                                               
0x00, 0x00, 0x06,      -- length [data should be 4 bytes]
-                                                                               
0x04,                  -- type 0x04
-                                                                               
0x00, 0x00, 0x00, 0x00,-- streamId 0
-                                                                               
0x00, 0x07) ..         -- event type 7
-                                                       data)                   
           -- return data
-
-                          else
-                                  log:debug("message type 4 - user control 
message ", event, ": ignored")
-                          end
-                  end,
-
-       [5]  = function(stream, rtmp)
-                          local window = unpackNumber(rtmp["body"], 1, 4)
-                          log:info("message type 5 - window ack size: ", 
window, " - ignored")
-                  end,
-
-       [6]  = function(stream, rtmp)
-                          local window = unpackNumber(rtmp["body"], 1, 4)
-                          local limit  = unpackNumber(rtmp["body"], 5, 1)
-
-                          log:info("message type 5 - set peer BW: ", window, " 
limit type ", limit, " - sending response")
-                          ackWindow = window / 2
-                          sendRtmp(stream, 
-                                               string.char(0x02,               
        -- chan 2, format 0
-                                                                       0x00, 
0x00, 0x00,           -- timestamp (not implemented)
-                                                                       0x00, 
0x00, 0x04,           -- length
-                                                                       0x05,   
                    -- type 0x05
-                                                                       0x00, 
0x00, 0x00, 0x00,     -- streamId 0
-                                                                       (window 
& 0xFF000000) >> 24,-- window as u32 be
-                                                                       (window 
& 0x00FF0000) >> 16,
-                                                                       (window 
& 0x0000FF00) >>  8,
-                                                                       (window 
& 0x000000FF)
-                                                       ))
-                  end,
-
-       [8]  = function(stream, rtmp)
-                          local byte1, byte2 = string.byte(rtmp["body"], 1, 2)
-                          if not byte1 then
-                                  return 0
-                          end
-
-                          if byte1 == 0xAF and byte2 == 0x01 then
-                                  -- AAC
-                                  --log:debug("message type 8 - AAC audiodata, 
len:", rtmp["length"])
-                                  local framesize = rtmp["length"] - 2 + 7
-                                  local header = string.char(adts1, adts2, 
adts3, 
-                                                                               
          adts4 | ((framesize >> 11) & 0x03),
-                                                                               
          adts5 | ((framesize >>  3) & 0xFF),
-                                                                               
          adts6 | ((framesize <<  5) & 0xE0),
-                                                                               
          adts7)
-
-                                  outBuf = outBuf .. header .. 
string.sub(rtmp["body"], 3)
-
-                          elseif byte1 == 0xAF and byte2 == 0x00 then
-                                  -- AAC Config
-                                  local firstword = unpackNumber(rtmp["body"], 
1, 4)
-                                  local profile   = 1
-                                  local sr_index  = (firstword & 0x00000780) 
>> 7
-                                  local channels  = (firstword & 0x00000078) 
>> 3
-
-                                  log:info("message type 8 - AAC config, 
profile: ", profile, " sr_index: ", sr_index, " channels: ", channels)
-
-                                  adts1 = 0xFF
-                                  adts2 = 0xF9
-                                  adts3 = ((profile << 6) & 0xC0) | ((sr_index 
<< 2) & 0x3C) | ((channels >> 2) & 0x1)
-                                  adts4 = ((channels << 6) & 0xC0)
-                                  adts5 = 0x00
-                                  adts6 = ((0x7FF >> 6) & 0x1F)
-                                  adts7 = ((0x7FF << 2) & 0xFC)
-
-                          elseif (byte1 & 0xF0) == 0x20 then
-                                  -- MP3
-                                  --log:debug("message type 8 - MP3 audiodata, 
len:", rtmp["length"])
-
-                                  outBuf = outBuf .. string.sub(rtmp["body"], 
2)
-                          end
-
-                          if state ~= "Playing" then
-                                  if state ~= "Buffering" then
-                                          if rtmpMessages["meta"] ~= nil then
-                                                  slimproto:send({ opcode = 
"RESP", headers = "" })
-                                          end
-                                          changeState("Buffering")
-                                  end
-                                  -- don't start playing live streams 
immediately as it causes stutter
-                                  if rtmpMessages["subscribe"] and 
rtmp["timestamp"] < 4500 then
-                                          return 0
-                                  else
-                                          changeState("Playing")
-                                  end
-                          end
-
-                          local n = stream:feedFromLua(outBuf)
-                          if n > 0 then
-                                  outBuf = string.sub(outBuf, n + 1)
-                          end
-
-                          return n
-                  end,
-
-       [18] = function(stream, rtmp)
-                          log:info("message type 18 - metadata")
-
-                          if rtmpMessages["meta"] == nil or 
rtmpMessages["meta"] == "send" then
-                                  slimproto:send({ opcode = "META", data = 
rtmp["body"] })
-                          end
-                  end,
-
-       [20] = function(stream, rtmp)
-                          log:info("message type 20")
-
-                          if rtmpMessages["meta"] == nil or 
rtmpMessages["meta"] == "send" then
-                                  slimproto:send({ opcode = "META", data = 
rtmp["body"] })
-                          end
-
-                          if string.match(rtmp["body"], "_result") then
-
-                                  if state == "sentConnect" then
-
-                                          log:info("sending createStream")
-                                          sendRtmp(stream, 
rtmpMessages["create"])
-                                          changeState("sentCreateStream")
-
-                                  elseif state == "sentCreateStream" then
-
-                                          if rtmpMessages["subscribe"] then
-                                                  log:info("sending 
FCSubscribe")
-                                                  sendRtmp(stream, 
rtmpMessages["subscribe"])
-                                                  
changeState("sentFCSubscribe")
-                                          else
-                                                  log:info("sending play")
-                                                  sendRtmp(stream, 
rtmpMessages["play"])
-                                                  changeState("sentPlay")
-                                          end
-                                  end
-
-                          elseif string.match(rtmp["body"], "_error") then
-
-                                  log:warn("stream error")
-                                  return nil, true
-
-                          elseif string.match(rtmp["body"], "onFCSubscribe") 
then
-
-                                  log:info("sending play")
-                                  sendRtmp(stream, rtmpMessages["play"])
-                                  changeState("sentPlay")
-
-                          elseif string.match(rtmp["body"], "onStatus") then
-
-                                  log:info("onStatus")
-
-                                  local error =
-                                          string.match(rtmp["body"], 
"NetStream%.Failed") or
-                                          string.match(rtmp["body"], 
"NetStream%.Play%.Failed") or
-                                          string.match(rtmp["body"], 
"NetStream%.Play%.StreamNotFound") or
-                                          string.match(rtmp["body"], 
"NetConnection%.Connect%.InvalidApp") or
-                                          string.match(rtmp["body"], 
"NetStream%.Play%.Complete") or
-                                          string.match(rtmp["body"], 
"NetStream%.Play%.Stop")
-
-                                  if error then
-                                          return nil, true
-                                  end
-
-                          end
-
-                  end,
-}
-
-
-function read(stream)
-
-       local readmore = true
-       local n = 0
-
-       while readmore do
-
-               -- read new data, contraining the size of our input buffer
-               -- without this check, fast servers can send us too much data 
causing OOM conditions
-               if string.len(inBuf) < 8192 then
-
-                       local new, error = stream:readToLua()
-                       if error then
-                               stream:setStreaming(false)
-                               stream:disconnect()
-                               return
-                       end
-                       if new then
-                               inBuf = inBuf .. new
-                               receivedBytes = receivedBytes + string.len(new)
-                       end
-
-               end
-
-               readmore = false
-
-               local len = string.len(inBuf)
-               
-               -- handshake phase
-               if (state == 'hsAwaitS0' or state == 'hsAwaitS1' or state == 
'hsAwaitS2') then
-                       
-                       local expect  = hsHanders[state][1]
-                       local handler = hsHanders[state][2]
-                       
-                       if len >= expect then
-
-                               local packet = string.sub(inBuf, 1, expect)
-                               inBuf = string.sub(inBuf, expect + 1)
-
-                               local newstate = handler(stream, packet)
-                               if not newstate then
-                                       stream:setStreaming(false)
-                                       stream:disconnect()
-                                       return
-                               end
-
-                               changeState(newstate)
-
-                               readmore = true
-
-                       end
-
-               -- rtmp parsing phase   
-               elseif len > 0 then
-               
-                       local header0 = string.byte(inBuf, 1)
-                       local chan    = header0 & 0x3f
-                       local fmt     = (header0 & 0xc0) >> 6
-                       local info, body
-
-                       if not inCaches[chan] then
-                               inCaches[chan] = {}
-                       end
-                       local inCache = inCaches[chan]
-
-                       if (chan == 0 or chan == 1) then
-                               log:error("rtmp chan > 63 - not supported")
-                               stream:setStreaming(false)
-                               stream:disconnect()
-                               return
-                       end
-
-                       if fmt == 0 and len >= 12 then
-
-                               local t0len = unpackNumber(inBuf, 5, 3)
-                               local read = math.min(t0len, recvChunkSize) + 12
-
-                               if len >= read then
-
-                                       info = {
-                                               chunkChan = chan,
-                                               type      = string.byte(inBuf, 
8),
-                                               timestamp = unpackNumber(inBuf, 
2, 3),
-                                               length    = t0len,
-                                               streamId  = unpackNumber(inBuf, 
9, 4, true)
-                                       }
-
-                                       local frag = string.sub(inBuf, 13, read)
-                                       inBuf = string.sub(inBuf, read + 1)
-
-                                       if read == t0len + 12 then
-                                               body = frag
-                                       else
-                                               inCaches[chan] = {
-                                                       info   = info,
-                                                       body   = frag,
-                                                       remain = t0len + 12 - 
read
-                                               }
-                                               inCache = inCache[chan]
-                                       end
-
-                                       readmore = true
-                               end
-
-                       elseif fmt == 1 and len >= 8 and inCache["info"] then
-
-                               local t1len = unpackNumber(inBuf, 5, 3)
-                               local read = math.min(t1len, recvChunkSize) + 8
-
-                               if len >= read then
-
-                                       local delta = unpackNumber(inBuf, 2, 3)
-
-                                       info = inCache["info"]
-                                       info["type"] = string.byte(inBuf, 8)
-                                       info["timestamp"] = info["timestamp"] + 
delta
-                                       info["length"] = t1len
-                                       -- streamId is cached
-                                       info["delta"] = delta
-
-                                       local frag = string.sub(inBuf, 9, read)
-                                       inBuf = string.sub(inBuf, read + 1)
-
-                                       if read == t1len + 8 then
-                                               body = frag
-                                       else
-                                               inCache["body"] = frag
-                                               inCache["remain"] = t1len + 8 - 
read
-                                       end
-
-                                       readmore = true
-                               end
-
-                       elseif fmt == 2 and inCache["info"] then
-
-                               local t2len = inCache["info"]["length"]
-                               local read = math.min(t2len, recvChunkSize) + 4
-
-                               if len >= read then
-
-                                       local delta = unpackNumber(inBuf, 2, 3)
-
-                                       info = inCache["info"]
-                                       info["timestamp"] = info["timestamp"] + 
delta
-                                       -- type, length, streamId is cached
-                                       info["delta"] = delta
-
-                                       local frag = string.sub(inBuf, 5, read)
-                                       inBuf = string.sub(inBuf, read + 1)
-
-                                       if read == t2len + 4 then
-                                               body = frag
-                                       else
-                                               inCache["body"] = frag
-                                               inCache["remain"] = t2len + 4 - 
read
-                                       end
-
-                                       readmore = true
-                               end
-
-                       elseif fmt == 3 and inCache["remain"] and 
inCache["remain"] > 0 then
-
-                               local read = math.min(inCache["remain"], 
recvChunkSize) + 1
-
-                               if len >= read then
-
-                                       local frag = string.sub(inBuf, 2, read)
-                                       inBuf = string.sub(inBuf, read + 1)
-
-                                       inCache["body"] = inCache["body"] .. 
frag
-                                       inCache["remain"] = inCache["remain"] - 
(read - 1)
-
-                                       if inCache["remain"] == 0 then
-                                               info = inCache["info"]
-                                               body = inCache["body"]
-                                       end
-
-                                       readmore = true
-                               end
-
-                       elseif fmt == 3 and inCache["info"] then
-
-                               local t3len = inCache["info"]["length"]
-                               local read = math.min(t3len, recvChunkSize) + 1
-
-                               if len >= read then
-                               
-                                       info = inCache["info"]
-                                       info["timestamp"] = info["timestamp"] + 
info["delta"]
-                                       -- type, length, streamId is cached
-
-                                       local frag = string.sub(inBuf, 2, read)
-                                       inBuf = string.sub(inBuf, read + 1)
-
-                                       if read == t3len + 1 then
-                                               body = frag
-                                       else
-                                               inCache["body"] = frag
-                                               inCache["remain"] = t3len + 1 - 
read
-                                       end
-
-                                       readmore = true
-                               end
-
-                       end
-
-                       if body then
-                               local rtmp = info
-                               inCache["info"] = info
-                               rtmp["body"] = body
-
-                               local handler = rtmpHandlers[ rtmp["type"] ]
-                               if handler then
-                                       local ret, error = handler(stream, rtmp)
-                                       if error then
-                                               stream:setStreaming(false)
-                                               stream:disconnect()
-                                               return ret
-                                       end
-                                       if ret then
-                                               n = n + ret
-                                       end
-                               else
-                                       log:warn("unhandled rtmp packet, type: 
", info["type"])
-                               end
-                       end
-
-               end
-
-               if receivedBytes > nextAck then
-                       log:info("sending ack")
-                       sendRtmp(stream, 
-                                        string.char(0x02,                      
 -- chan 2, format 0
-                                                                0x00, 0x00, 
0x00,           -- timestamp (not implemented)
-                                                                0x00, 0x00, 
0x04,           -- length
-                                                                0x03,          
             -- type 0x03
-                                                                0x00, 0x00, 
0x00, 0x00,     -- streamId 0
-                                                                (receivedBytes 
& 0xFF000000) >> 24,
-                                                                (receivedBytes 
& 0x00FF0000) >> 16,
-                                                                (receivedBytes 
& 0x0000FF00) >>  8,
-                                                                (receivedBytes 
& 0x000000FF)
-                                                ))
-                       nextAck = nextAck + ackWindow
-               end
-               
-       end
-
-       return n
-
-end
-
-
+       -- start the rtmp protocol by sending the handshake (implemented in C)
+       return rtmpC.sendHandshake(stream);
+end
+
+
+-- called from C to send RESP message back to server
+function streamStartEvent()
+       if rtmpMessages["meta"] ~= nil then
+               slimproto:send({ opcode = "RESP", headers = "" })
+       end
+end
+
+
+-- called from C to send Metadata back to server
+function sendMeta(msg)
+       if rtmpMessages["meta"] == nil or rtmpMessages["meta"] == "send" then
+               slimproto:send({ opcode = "META", data = msg })
+       end
+end
+
+
+---------------------------------------------------------------------------------------------------------------------
 -- amf packet formatting and serialisation code
 -- this is added for api version 2 so we don't rely on server code to generate 
serialsed amf0
+---------------------------------------------------------------------------------------------------------------------
+
+local function packNumber(v, len, le)
+       local t = {}
+       for i = 1, len do
+               t[#t + 1] = string.char(v & 0xFF)
+               v = v >> 8
+       end
+       local str = table.concat(t)
+       return le and str or string.reverse(str)
+end
+
 
 -- emulate pack "d" to serialise numbers as doubles
 -- this only implements most signficant 28 bits of the mantissa, rest are 0
@@ -715,7 +173,7 @@
 function formatRtmp(chan, type, streamId, body)
        return
                string.char(chan & 0x7f) ..        -- chan X, format 0
-               string.char(0x00, 0x00, 0x00) ..   -- timestamp (not 
implemented)
+               string.char(0x00, 0x00, 0x00) ..   -- timestamp
                packNumber(string.len(body), 3) .. -- length
                string.char(type & 0xff) ..        -- type
                packNumber(streamId, 4, true) ..   -- streamId

Modified: 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h
URL: 
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h (original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h Sat Mar  5 
03:04:13 2011
@@ -37,6 +37,8 @@
 
 extern void streambuf_flush(void);
 
+extern void streambuf_feed(u8_t *buf, size_t size);
+
 /* the mutex should be locked when using fast read */
 extern size_t streambuf_fast_read(u8_t *buf, size_t min, size_t max, bool_t 
*streaming);
 

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/mailman/listinfo/jive-checkins

Reply via email to