Author: adrian
Date: Sat Mar 5 03:04:13 2011
New Revision: 9339
URL: http://svn.slimdevices.com/jive?rev=9339&view=rev
Log:
Bug: N/A
Description: Move Rtmp protocol implementation to C for all of the packet
processing. Previous lua only implementation created lots of lua strings which
proved a performance issue for high bitrate streams.
Added:
7.6/trunk/squeezeplay/src/luartmp-squeezeplay/
7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile
7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
Modified:
7.6/trunk/squeezeplay/src/Makefile.linux
7.6/trunk/squeezeplay/src/Makefile.squeezeos
7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua
7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h
Modified: 7.6/trunk/squeezeplay/src/Makefile.linux
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/Makefile.linux?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/Makefile.linux (original)
+++ 7.6/trunk/squeezeplay/src/Makefile.linux Sat Mar 5 03:04:13 2011
@@ -158,8 +158,8 @@
# squeezeplay
#
-.PHONY: app portaudio flac libmad tremor squeezeplay squeezeplay_desktop
squeezeplay_contrib squeezeplay_private freefont freefont-debian axtls
-app: portaudio flac libmad tremor ${SPPRIVATE_TARGETS} squeezeplay
squeezeplay_desktop squeezeplay_contrib freefont squeezeplay-tgz
+.PHONY: app portaudio flac libmad tremor rtmp squeezeplay squeezeplay_desktop
squeezeplay_contrib squeezeplay_private freefont freefont-debian axtls
+app: portaudio flac libmad tremor rtmp ${SPPRIVATE_TARGETS} squeezeplay
squeezeplay_desktop squeezeplay_contrib freefont squeezeplay-tgz
# portaudio
portaudio_v19_1360/Makefile:
@@ -207,6 +207,9 @@
axtls: ${AXTLS_DIR}/Makefile
cd ${AXTLS_DIR}; make oldconfig && make && make install
+rtmp:
+ cd luartmp-squeezeplay; make
+ cp luartmp-squeezeplay/rtmp.so ${PREFIX}/lib/lua/5.1/rtmp.so
# squeezeplay
Modified: 7.6/trunk/squeezeplay/src/Makefile.squeezeos
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/Makefile.squeezeos?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/Makefile.squeezeos (original)
+++ 7.6/trunk/squeezeplay/src/Makefile.squeezeos Sat Mar 5 03:04:13 2011
@@ -138,8 +138,8 @@
#
# squeezeplay
#
-.PHONY: squeezeplay-all portaudio flac libmad tremor squeezeplay
squeezeplay_jive freefont
-squeezeplay-all: portaudio flac libmad tremor squeezeplay squeezeplay_jive
freefont
+.PHONY: squeezeplay-all portaudio flac libmad tremor rtmp squeezeplay
squeezeplay_jive freefont
+squeezeplay-all: portaudio flac libmad tremor rtmp squeezeplay
squeezeplay_jive freefont
# portaudio
@@ -169,6 +169,10 @@
tremor: Tremor/Makefile
cd Tremor; make && make install
+
+rtmp:
+ cd luartmp-squeezeplay; make
+ cp luartmp-squeezeplay/rtmp.so ${PREFIX}/lib/lua/5.1/rtmp.so
# squeezeplay
Added: 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile?rev=9339&view=auto
==============================================================================
--- 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile (added)
+++ 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/Makefile Sat Mar 5 03:04:13
2011
@@ -1,0 +1,7 @@
+# Makefile for C component of Rtmp.lua specifically for squeezeplay
+# This module is included here so that it can be built as a shared library
rather than statically linked into squeezeplay
+
+MYCFLAGS = -I../squeezeplay/src -I../squeezeplay/src/ui
+
+rtmp.so: rtmp.c
+ $(CC) $(CFLAGS) $(MYCFLAGS) -shared rtmp.c -o rtmp.so
Added: 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c?rev=9339&view=auto
==============================================================================
--- 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c (added)
+++ 7.6/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c Sat Mar 5 03:04:13
2011
@@ -1,0 +1,803 @@
+/*
+ This module provides the C side of the rtmp implementation - see also
jive.audio.Rtmp.lua
+
+ The module implements a subset of the Adobe RTMP protocol as specified by:
+ http://www.adobe.com/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
+
+ (c) Adrian Smith (Triode), 2009, 2010, 2011, [email protected]
+
+ The protocol state machine and all packet processing is now implemented in
C to improve performance and resource demands.
+ The remaining lua code is used to create serialised rtmp request packets
which are passed to the C protocol implementation.
+
+ The implementation makes the following assumptions:
+
+ 1) streams use a streamingId of 1 (it ignores the streamingId inside the
amf0 _result reponse to a createStream message)
+ 2) only implements single byte chunk headers (chunk id < 63)
+ 3) timestamps are not send in any packets sent to the server (they are
always set to 0)
+
+*/
+
+#include "common.h"
+#include "jive.h"
+#include "audio/streambuf.h"
+
+#if defined(WIN32)
+
+#include <winsock2.h>
+
+typedef SOCKET socket_t;
+#define CLOSESOCKET(s) closesocket(s)
+#define SOCKETERROR WSAGetLastError()
+
+#else
+
+typedef int socket_t;
+#define CLOSESOCKET(s) close(s)
+#define SOCKETERROR errno
+
+#endif
+
+extern LOG_CATEGORY *log_audio_decode;
+
+struct stream {
+ socket_t fd;
+ // other stuff we don't use
+};
+
+#define min(x, y) ((x) < (y) ? (x) : (y))
+
+#define INCACHE_SLOTS 8 // slots for caching state for concurrent active
rtmp chunk channels - set to be higher than seen, which is 5
+#define BUFFER_UNTIL_TS 4500 // on stream start only transition to playing
state after this timestamp, avoids startup rebuffer
+
+typedef enum {
+ RTMP_IDLE = 0, RTMP_AWAIT_S0 = 1, RTMP_AWAIT_S1 = 2, RTMP_AWAIT_S2 = 3,
+ RTMP_SENT_CONNECT = 4, RTMP_SENT_CREATE_STREAM = 5,
RTMP_SENT_FC_SUBSCRIBE = 6, RTMP_SENT_PLAY = 7,
+ RTMP_BUFFERING = 8, RTMP_PLAYING = 9
+} rtmp_state;
+
+static const char *rtmp_state_name[] = {
+ "idle", "awaitS0", "awaitS1", "awaitS2", "sentConnect",
"sentCreateStream", "sentFCSubscribe", "sentPlay",
+ "Buffering", "Playing"
+};
+
+static rtmp_state state = RTMP_IDLE;
+static u8_t *hs_token = NULL;
+static unsigned recv_chunksize;
+static unsigned recv_bytes;
+static unsigned next_ack;
+static unsigned ack_wind;
+
+struct {
+ u8_t buf[4096*16];
+ u8_t *pos;
+ unsigned len;
+} inbuf;
+
+struct incache_entry {
+ unsigned chan;
+ u8_t type;
+ u8_t *buf;
+ unsigned len;
+ unsigned rem;
+ unsigned ts;
+ unsigned dts;
+};
+
+static struct incache_entry incache[INCACHE_SLOTS];
+
+void change_state(rtmp_state new) {
+ int i;
+ LOG_INFO(log_audio_decode, "rtmp state: %s -> %s",
rtmp_state_name[state], rtmp_state_name[new]);
+ state = new;
+ // if moving to idle reinit state
+ if (state == RTMP_IDLE) {
+ inbuf.pos = inbuf.buf;
+ inbuf.len = 0;
+ recv_chunksize = 128;
+ recv_bytes = 0;
+ next_ack = 20480;
+ ack_wind = 20480;
+ for (i = 0; i < INCACHE_SLOTS; i++) {
+ if (incache[i].buf) {
+ free(incache[i].buf);
+ }
+ }
+ memset(incache, 0, sizeof(struct incache_entry) *
INCACHE_SLOTS);
+ }
+}
+
+// busywaiting send used to send outbound packets
+// it is assumed these are normally smaller than sndbuf so this does not stall
+void _send(socket_t s, u8_t *buf, size_t len) {
+ int n, stalled = 0;
+ while (len > 0) {
+ n = send(s, buf, len, 0);
+ if (n >= 0) {
+ len -= n;
+ buf += n;
+ } else if (SOCKETERROR == EAGAIN || SOCKETERROR == EWOULDBLOCK)
{
+ ++stalled;
+ if (stalled % 10 == 9) {
+ LOG_ERROR(log_audio_decode, "stalled writing,
count: %d", stalled);
+ }
+ } else {
+ LOG_ERROR(log_audio_decode, "problem writing, error:
%s", strerror(SOCKETERROR));
+ break;
+ }
+ }
+}
+
+// send rtmp packets fragmenting if necessary
+// assume all packets have a t0 header (no header compression)
+void send_rtmp(socket_t s, u8_t *buf, size_t len) {
+ u8_t header0 = *buf;
+
+ if (len >= 12) {
+
+ // first 12 bytes are the t0 header
+ _send(s, buf, 12);
+ buf += 12;
+ len -= 12;
+
+ while (len > 0) {
+ // fragment into chunks of 128 bytes
+ size_t chunklen = min(len, 128);
+ _send(s, buf, chunklen);
+ buf += chunklen;
+ len -= chunklen;
+
+ // add fragment header if more
+ if (len > 0) {
+ u8_t header = header0 | 0xc0;
+ _send(s, &header, 1);
+ }
+ }
+
+ } else {
+ LOG_ERROR(log_audio_decode, "packet too short");
+ }
+}
+
+int send_handshakeL(lua_State *L) {
+ struct stream *stream;
+ u8_t *ptr;
+ int i;
+
+ stream = lua_touserdata(L, 1);
+
+ // reset rtmp state
+ change_state(RTMP_IDLE);
+
+ if (!hs_token) {
+ hs_token = malloc(1528);
+ }
+
+ for (i = 0, ptr = hs_token; i < 1528; ++i) {
+ *ptr++ = rand() % 256;
+ }
+
+ // c0
+ _send(stream->fd, (u8_t*)"\x03", 1);
+ // c1 header
+ _send(stream->fd, (u8_t*)"\x00\x00\x00\x00\x00\x00\x00\x00", 8);
+ // c1 token
+ _send(stream->fd, hs_token, 1528);
+
+ change_state(RTMP_AWAIT_S0);
+
+ lua_pushboolean(L, TRUE);
+ return 1;
+}
+
+bool rtmp_packet_exists(lua_State *L, const char *name) {
+ bool exists;
+
+ lua_getglobal(L, "jive");
+ lua_getfield(L, -1, "audio");
+ lua_getfield(L, -1, "Rtmp");
+ lua_getfield(L, -1, "rtmpMessages");
+ lua_getfield(L, -1, name);
+
+ exists = lua_isstring(L, -1);
+
+ lua_pop(L, 5);
+
+ return exists;
+}
+
+void send_rtmp_packet(lua_State *L, const char *name) {
+ struct stream *stream = lua_touserdata(L, 1);;
+ u8_t *packet;
+ size_t len;
+
+ // get preformatted packets from lua
+ lua_getglobal(L, "jive");
+ lua_getfield(L, -1, "audio");
+ lua_getfield(L, -1, "Rtmp");
+ lua_getfield(L, -1, "rtmpMessages");
+ lua_getfield(L, -1, name);
+
+ if (lua_isstring(L, -1)) {
+ LOG_INFO(log_audio_decode, "sending %s packet", name);
+ packet = (u8_t *)lua_tolstring(L, -1, &len);
+ send_rtmp(stream->fd, packet, len);
+ } else {
+ LOG_INFO(log_audio_decode, "can't find rtmp packet: %s", name);
+ }
+
+ lua_pop(L, 5);
+}
+
+// rtmp packet handlers - return false for error to force stream close
+
+// receive chunk size handler
+bool messageType1(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ recv_chunksize = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
+ LOG_INFO(log_audio_decode, "message type 1 - set recv chunk size to:
%d", recv_chunksize);
+ return true;
+}
+
+// abort channel handler
+bool messageType2(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ LOG_INFO(log_audio_decode, "message type 2 - abort for chunk channel:
%d", entry->chan);
+ if (entry->buf) {
+ free(entry->buf);
+ }
+ memset(entry, 0, sizeof(struct incache_entry));
+ return true;
+}
+
+// ack received handler
+bool messageType3(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ LOG_INFO(log_audio_decode, "message type 3 - ack received");
+ return true;
+}
+
+// user control message handler
+bool messageType4(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ unsigned event = *(buf) << 8 | *(buf+1);
+ u8_t *data = buf + 2;
+ switch (event) {
+ case 0: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 0: Stream Begin"); break;
+ case 1: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 1: EOF - exiting");
+ return false;
+ break;
+ case 2: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 2: Stream Dry"); break;
+ case 4: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 4: Stream Is Recorded"); break;
+ case 6:
+ LOG_INFO(log_audio_decode, "message type 4 - user control
message event 6: Ping Request - sending response");
+ {
+ struct stream *stream = lua_touserdata(L, 1);
+ u8_t *packet_template, packet[18];
+
+ packet_template = (u8_t *)
+ "\x02" // chan 2, format 0
+ "\x00\x00\x00" // timestamp (null)
+ "\x00\x00\x06" // length [data should be 4
bytes]
+ "\x04" // type 0x04
+ "\x00\x00\x00\x00" // streamId 0
+ "\x00\x07" // event type 7
+ "\x00\x00\x00\x00";// (overwrite with data - 4
bytes)
+
+ memcpy(packet, packet_template, 18);
+ memcpy(packet + 14, data, 4);
+
+ send_rtmp(stream->fd, packet, 18);
+ }
+ break;
+
+ default: LOG_DEBUG(log_audio_decode, "message type 4 - user control
message event %d: ignored", event);
+ }
+ return true;
+}
+
+// window ack size handler
+bool messageType5(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
+ LOG_INFO(log_audio_decode, "message type 5 - window ack size: %d -
ignored", window);
+ return true;
+}
+
+// set window size handler
+bool messageType6(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ struct stream *stream = lua_touserdata(L, 1);
+ unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
+ unsigned limit = *(buf+4);
+ u8_t *packet_template, packet[16];
+
+ // send window ack packet
+ LOG_INFO(log_audio_decode, "message type 6 - set peer BW: %d limit
type: %d", window, limit);
+
+ packet_template = (u8_t *)
+ "\x02" // chan 2, format 0
+ "\x00\x00\x00" // timestamp (null)
+ "\x00\x00\x04" // length
+ "\x05" // type 0x05
+ "\x00\x00\x00\x00" // streamId 0
+ "\x00\x00\x00\x00";// (overwrite with window)
+
+ memcpy(packet, packet_template, 16);
+ // buf[0-3] is window - copy it
+ memcpy(packet+12, buf, 4);
+ send_rtmp(stream->fd, packet, 16);
+
+ ack_wind = window / 2;
+ return true;
+}
+
+// audio packet handler
+bool messageType8(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ int n = streambuf_get_freebytes();
+
+ if (*buf == 0xAF) {
+ static u8_t adts[7]; // adts static header, set by aac config
+
+ if (*(buf + 1) == 0x01) {
+ // AAC audio
+ u8_t header[7];
+ unsigned framesize = entry->len - 2 + 7;
+ memcpy(header, adts, 7);
+ header[3] |= ((framesize >> 11) & 0x03);
+ header[4] |= ((framesize >> 3) & 0xFF);
+ header[5] |= ((framesize << 5) & 0xE0);
+ LOG_DEBUG(log_audio_decode, "aac audio data: %d
timestamp: %d", framesize, entry->ts);
+ if (n > framesize) {
+ streambuf_feed(header, 7);
+ streambuf_feed(buf + 2, entry->len - 2);
+ } else {
+ LOG_ERROR(log_audio_decode, "panic - not enough
space in streambuf - case not handled by implementation");
+ return false;
+ }
+
+ } else if (*(buf + 1) == 0x00) {
+ // AAC config
+ unsigned profile = 1; // hard coded, ignore config
+ unsigned sr_index = ((*(buf+2) << 8 | *(buf+3)) &
0x0780) >> 7;
+ unsigned channels = (*(buf+3) & 0x78) >> 3;
+ LOG_INFO(log_audio_decode, "aac config: profile: %d
sr_index: %d channels: %d", profile, sr_index, channels);
+ adts[0] = 0xFF;
+ adts[1] = 0xF9;
+ adts[2] = ((profile << 6) & 0xC0) | ((sr_index << 2) &
0x3C) | ((channels >> 2) & 0x1);
+ adts[3] = ((channels << 6) & 0xC0);
+ adts[4] = 0x00;
+ adts[5] = ((0x7FF >> 6) & 0x1F);
+ adts[6] = ((0x7FF << 2) & 0xFC);
+ }
+
+ } else if ((*buf & 0xF0) == 0x20) {
+ // MP3 audio
+ LOG_DEBUG(log_audio_decode, "mp3 audio data: %d timestamp: %d",
entry->len - 1, entry->ts);
+ if (n >= entry->len - 1) {
+ streambuf_feed(buf + 1, entry->len - 1);
+ } else {
+ LOG_ERROR(log_audio_decode, "panic - not enough space
in streambuf - case not handled by implementation");
+ return false;
+ }
+ }
+
+ if (state < RTMP_PLAYING) {
+
+ bool send_start = false;
+
+ if (state < RTMP_BUFFERING) {
+ if (!rtmp_packet_exists(L, "live")) {
+ send_start = true;
+ } else {
+ change_state(RTMP_BUFFERING);
+ }
+ }
+
+ if (state == RTMP_BUFFERING && entry->ts > BUFFER_UNTIL_TS) {
+ send_start = true;
+ }
+
+ if (send_start) {
+ // send streamStartEvent to start playback
+ lua_getglobal(L, "jive");
+ lua_getfield(L, -1, "audio");
+ lua_getfield(L, -1, "Rtmp");
+ lua_getfield(L, -1, "streamStartEvent");
+ if (lua_pcall(L, 0, 0, 0) != 0) {
+ fprintf(stderr, "error running
streamStartEvent: %s\n", lua_tostring(L, -1));
+ }
+ change_state(RTMP_PLAYING);
+ }
+ }
+
+ return true;
+}
+
+// metadata handler
+bool messageType18(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ LOG_INFO(log_audio_decode, "message type 18 - meta");
+
+ // send to server for debug
+ lua_getglobal(L, "jive");
+ lua_getfield(L, -1, "audio");
+ lua_getfield(L, -1, "Rtmp");
+ lua_getfield(L, -1, "sendMeta");
+ lua_pushlstring(L, (const char *)buf, entry->len);
+ if (lua_pcall(L, 1, 0, 0) != 0) {
+ fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L,
-1));
+ }
+ return true;
+}
+
+// helper for messageType20 which returns true if string exists anywhere
within buf
+bool bufmatch(u8_t *buf, size_t len, const char *string) {
+ unsigned i, string_len, match = 0;
+ string_len = strlen(string);
+ for (i = 0; i < len; i++) {
+ if (*buf++ == string[match]) {
+ match++;
+ } else {
+ match = 0;
+ }
+ if (match == string_len) {
+ return true;
+ }
+ }
+ return false;
+}
+
+// message type 20
+bool messageType20(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+ LOG_INFO(log_audio_decode, "message type 20");
+
+ // send packet to server for debug
+ lua_getglobal(L, "jive");
+ lua_getfield(L, -1, "audio");
+ lua_getfield(L, -1, "Rtmp");
+ lua_getfield(L, -1, "sendMeta");
+ lua_pushlstring(L, (const char *)buf, entry->len);
+ if (lua_pcall(L, 1, 0, 0) != 0) {
+ fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L,
-1));
+ }
+
+ if (bufmatch(buf, entry->len, "_result")) {
+
+ if (state == RTMP_SENT_CONNECT) {
+
+ LOG_INFO(log_audio_decode, "sending createStream");
+ send_rtmp_packet(L, "create");
+ change_state(RTMP_SENT_CREATE_STREAM);
+
+ } else if (state == RTMP_SENT_CREATE_STREAM) {
+
+ if (rtmp_packet_exists(L, "subscribe")) {
+
+ LOG_INFO(log_audio_decode, "sending
FCSubscribe");
+ send_rtmp_packet(L, "subscribe");
+ change_state(RTMP_SENT_FC_SUBSCRIBE);
+
+ } else {
+
+ LOG_INFO(log_audio_decode, "sending play");
+ send_rtmp_packet(L, "play");
+ change_state(RTMP_SENT_PLAY);
+
+ }
+ }
+
+ } else if (bufmatch(buf, entry->len, "_error")) {
+
+ LOG_WARN(log_audio_decode, "stream error");
+ return false;
+
+ } else if (bufmatch(buf, entry->len, "onFCSubscribe")) {
+
+ LOG_INFO(log_audio_decode, "sending play");
+ send_rtmp_packet(L, "play");
+ change_state(RTMP_SENT_PLAY);
+
+ } else if (bufmatch(buf, entry->len, "onStatus")) {
+
+ LOG_INFO(log_audio_decode, "onStatus");
+
+ if (bufmatch(buf, entry->len, "NetStream.Failed") ||
+ bufmatch(buf, entry->len, "NetStream.Play.Failed") ||
+ bufmatch(buf, entry->len,
"NetStream.Play.StreamNotFound") ||
+ bufmatch(buf, entry->len,
"NetConnection.Connect.InvalidApp") ||
+ bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
+ bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+
+ LOG_WARN(log_audio_decode, "error status received -
closing stream");
+
+ return false;
+ }
+ }
+
+ return true;
+}
+
+int readL(lua_State *L) {
+ struct stream *stream;
+ bool readmore = true;
+ /*
+ * 1: Stream (self)
+ * 2: Playback (self)
+ */
+
+ stream = lua_touserdata(L, 1);
+
+ // shuffle existing data in inbuf to start
+ if (inbuf.len) {
+ memcpy(inbuf.buf, inbuf.pos, inbuf.len);
+ inbuf.pos = inbuf.buf;
+ }
+
+ while (readmore) {
+
+ size_t len;
+ readmore = false;
+
+ // shuffle to the start if only using second half of buffer
(i.e. don't do each loop)
+ if (inbuf.len && inbuf.pos - inbuf.buf > sizeof(inbuf.buf) / 2)
{
+ memcpy(inbuf.buf, inbuf.pos, inbuf.len);
+ inbuf.pos = inbuf.buf;
+ }
+
+ len = recv(stream->fd, inbuf.pos + inbuf.len, sizeof(inbuf.buf)
- (inbuf.pos - inbuf.buf + inbuf.len), 0);
+
+ if (len == -1) {
+ if (SOCKETERROR == EAGAIN) {
+ if (inbuf.len == 0) {
+ lua_pushinteger(L, 0);
+ return 1;
+ }
+ } else {
+ LOG_ERROR(log_audio_decode, "socket closed,
%s", strerror(SOCKETERROR));
+ CLOSESOCKET(stream->fd);
+ lua_pushnil(L);
+ lua_pushstring(L, strerror(SOCKETERROR));
+ return 2;
+ }
+ } else {
+ inbuf.len += len;
+ recv_bytes += len;
+ }
+
+ // handshake phase
+ if (state < RTMP_SENT_CONNECT) {
+
+ if (state == RTMP_AWAIT_S0 && inbuf.len >= 1 &&
*inbuf.pos == 0x03) {
+ inbuf.pos += 1;
+ inbuf.len -= 1;
+ change_state(RTMP_AWAIT_S1);
+ }
+
+ if (state == RTMP_AWAIT_S1 && inbuf.len >= 1536) {
+ _send(stream->fd, inbuf.pos, 4);
+ _send(stream->fd, (unsigned char *)
"\x00\x00\x00\x00", 4);
+ _send(stream->fd, inbuf.pos + 8, 1528);
+ inbuf.pos += 1536;
+ inbuf.len -= 1536;
+ change_state(RTMP_AWAIT_S2);
+ }
+
+ if (state == RTMP_AWAIT_S2 && inbuf.len >= 1536) {
+ if (hs_token && !strncmp((char *)(inbuf.pos +
8), (char *) hs_token, 1528)) {
+ free(hs_token);
+ hs_token = NULL;
+ inbuf.pos += 1536;
+ inbuf.len -= 1536;
+ send_rtmp_packet(L, "connect");
+ change_state(RTMP_SENT_CONNECT);
+ } else {
+ LOG_ERROR(log_audio_decode, "bad
handshake token");
+ CLOSESOCKET(stream->fd);
+ lua_pushnil(L);
+ lua_pushstring(L, "bad handshake
token");
+ return 2;
+ }
+ }
+ }
+
+ // connected phase
+ if (state >= RTMP_SENT_CONNECT && inbuf.len > 0) {
+
+ unsigned chan = *inbuf.pos & 0x3f;
+ unsigned fmt = (*inbuf.pos & 0xc0) >> 6;
+ u8_t *dpos = NULL;
+ bool reasembled = false;
+ struct incache_entry *entry;
+ int i;
+ char *error = NULL;
+
+ // find or create a cache entry for this chan in the
incache
+ for (i = 0; i < INCACHE_SLOTS; i++) {
+ entry = &incache[i];
+ if (chan == entry->chan) {
+ break;
+ } else if (!entry->chan) {
+ entry->chan = chan;
+ break;
+ }
+ }
+
+ if (i == INCACHE_SLOTS) {
+ error = "run out of incache slots";
+ } else if (chan == 0 || chan == 1) {
+ error = "rtmp chan > 63 - not supported";
+ }
+
+ if (error) {
+ LOG_ERROR(log_audio_decode, "%s", error);
+ CLOSESOCKET(stream->fd);
+ lua_pushnil(L);
+ lua_pushstring(L, error);
+ return 2;
+ }
+
+ if (fmt == 0 && inbuf.len >= 12) {
+
+ int t0len = (*(inbuf.pos+4) << 16) |
(*(inbuf.pos+5) << 8) | *(inbuf.pos+6);
+ int read = min(t0len, recv_chunksize) + 12;
+
+ if (inbuf.len >= read) {
+ entry->type = *(inbuf.pos + 7);
+ entry->len = t0len;
+ entry->ts = (*(inbuf.pos+1) << 16) |
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+ if (t0len == read - 12) {
+ dpos = inbuf.pos + 12;
+ } else {
+ if (entry->buf)
free(entry->buf);
+ entry->buf = malloc(t0len);
+ memcpy(entry->buf, inbuf.pos +
12, read - 12);
+ entry->rem = t0len + 12 - read;
+ }
+ inbuf.pos += read;
+ inbuf.len -= read;
+ readmore = true;
+ }
+
+ } else if (fmt == 1 && inbuf.len >= 8) {
+
+ int t1len = (*(inbuf.pos+4) << 16) |
(*(inbuf.pos+5) << 8) | *(inbuf.pos+6);
+ int read = min(t1len, recv_chunksize) + 8;
+
+ if (inbuf.len >= read) {
+ entry->type = *(inbuf.pos + 7);
+ entry->len = t1len;
+ entry->dts = (*(inbuf.pos+1) << 16) |
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+ entry->ts += entry->dts;
+ if (t1len == read - 8) {
+ dpos = inbuf.pos + 8;
+ } else {
+ if (entry->buf)
free(entry->buf);
+ entry->buf = malloc(t1len);
+ memcpy(entry->buf, inbuf.pos +
8, read - 8);
+ entry->rem = t1len + 8 - read;
+ }
+ inbuf.pos += read;
+ inbuf.len -= read;
+ readmore = true;
+ }
+
+ } else if (fmt == 2 && entry->type) {
+
+ int t2len = entry->len;
+ int read = min(t2len, recv_chunksize) + 4;
+
+ if (inbuf.len >= read) {
+ entry->dts = (*(inbuf.pos+1) << 16) |
(*(inbuf.pos+2) << 8) | *(inbuf.pos+3);
+ entry->ts += entry->dts;
+ if (t2len == read - 4) {
+ dpos = inbuf.pos + 4;
+ } else {
+ if (entry->buf)
free(entry->buf);
+ entry->buf = malloc(t2len);
+ memcpy(entry->buf, inbuf.pos +
4, read - 4);
+ entry->rem = t2len + 4 - read;
+ }
+ inbuf.pos += read;
+ inbuf.len -= read;
+ readmore = true;
+ }
+
+ } else if (fmt == 3 && entry->rem) {
+
+ int read = min(entry->rem, recv_chunksize) + 1;
+
+ if (inbuf.len >= read) {
+ // add to existing fragment
+ memcpy(entry->buf + entry->len -
entry->rem, inbuf.pos + 1, read - 1);
+ entry->rem -= (read - 1);
+ if (!entry->rem) {
+ reasembled = true;
+ }
+ inbuf.pos += read;
+ inbuf.len -= read;
+
+ readmore = true;
+ }
+
+ } else if (fmt == 3 && entry->type) {
+
+ int t3len = entry->len;
+ int read = min(t3len, recv_chunksize) + 1;
+
+ if (inbuf.len >= read) {
+ entry->ts += entry->dts;
+ if (t3len == read - 1) {
+ dpos = inbuf.pos + 1;
+ } else {
+ if (entry->buf)
free(entry->buf);
+ entry->buf = malloc(t3len);
+ memcpy(entry->buf, inbuf.pos +
1, read - 1);
+ entry->rem = t3len + 1 - read;
+ }
+ inbuf.pos += read;
+ inbuf.len -= read;
+
+ readmore = true;
+ }
+ }
+
+ if (dpos || reasembled) {
+ u8_t *buf = dpos ? dpos : entry->buf;
+ bool ok = true;
+
+ switch(entry->type) {
+ case 1: ok = messageType1(L, buf, entry);
break;
+ case 2: ok = messageType2(L, buf, entry);
break;
+ case 3: ok = messageType3(L, buf, entry);
break;
+ case 4: ok = messageType4(L, buf, entry);
break;
+ case 5: ok = messageType5(L, buf, entry);
break;
+ case 6: ok = messageType6(L, buf, entry);
break;
+ case 8: ok = messageType8(L, buf, entry);
break;
+ case 18: ok = messageType18(L, buf, entry);
break;
+ case 20: ok = messageType20(L, buf, entry);
break;
+ default:
+ LOG_DEBUG(log_audio_decode, "unhandled
rtmp packet type: %d", entry->type);
+ }
+
+ if (!ok) {
+ LOG_ERROR(log_audio_decode, "handler
returned false - closing stream");
+ CLOSESOCKET(stream->fd);
+ lua_pushnil(L);
+ lua_pushstring(L, "handler returned
false - closing stream");
+ return 2;
+ }
+ }
+ }
+
+ if (recv_bytes > next_ack) {
+ u8_t *packet_template, packet[16];
+
+ LOG_DEBUG(log_audio_decode, "sending ack: %ud",
recv_bytes);
+
+ // send ack packet
+ packet_template = (u8_t *)
+ "\x02" // chan 2, format 0
+ "\x00\x00\x00" // timestamp (null)
+ "\x00\x00\x04" // length
+ "\x03" // type 0x03
+ "\x00\x00\x00\x00" // streamId 0
+ "\x00\x00\x00\x00";// (overwrite with
recv_bytes)
+
+ memcpy(packet, packet_template, 16);
+ *(packet+12) = (recv_bytes & 0xFF000000) >> 24;
+ *(packet+13) = (recv_bytes & 0x00FF0000) >> 16;
+ *(packet+14) = (recv_bytes & 0x0000FF00) >> 8;
+ *(packet+15) = (recv_bytes & 0x000000FF);
+
+ send_rtmp(stream->fd, packet, 16);
+ next_ack += ack_wind;
+ }
+ }
+
+ lua_pushinteger(L, 1);
+ return 1;
+}
+
+static const struct luaL_Reg rtmp_f [] = {
+ { "sendHandshake", send_handshakeL },
+ { "read", readL },
+ { NULL, NULL }
+};
+
+int luaopen_rtmp (lua_State *L) {
+ luaL_register(L, "rtmp", rtmp_f);
+ return 1;
+}
Modified: 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua
(original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Playback.lua Sat Mar
5 03:04:13 2011
@@ -1,5 +1,5 @@
-local assert, tostring, type, pairs, ipairs, getmetatable = assert, tostring,
type, pairs, ipairs, getmetatable
+local assert, tostring, type, pairs, ipairs, getmetatable, require = assert,
tostring, type, pairs, ipairs, getmetatable, require
local oo = require("loop.base")
@@ -10,7 +10,6 @@
local hasDecode, decode = pcall(require, "squeezeplay.decode")
local hasSprivate, spprivate = pcall(require, "spprivate")
local Stream = require("squeezeplay.stream")
-local Rtmp = require("jive.audio.Rtmp")
local SlimProto = require("jive.net.SlimProto")
local Player = require("jive.slim.Player")
@@ -118,7 +117,8 @@
slimproto:capability("Spdirect", cap)
end
- Rtmp:init(slimproto)
+ -- signal we are Rtmp capable, but don't load module until used
+ slimproto:capability("Rtmp", 2)
self.mode = 0
self.threshold = 0
@@ -466,7 +466,8 @@
m.read = reader
m.write = writer
elseif self.flags & 0x20 == 0x20 then
- -- use Rtmp methods
+ -- use Rtmp methods, load Rtmp module on demand
+ Rtmp = require("jive.audio.Rtmp")
m.read = Rtmp.read
m.write = Rtmp.write
else
Modified: 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua (original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/share/jive/audio/Rtmp.lua Sat Mar 5
03:04:13 2011
@@ -1,30 +1,32 @@
-- This module implements a subset of the Adobe RTMP protocol as specified by:
-- http://www.adobe.com/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
--
--- It provides direct streaming for rtmp in conjunction with the RTMP.pm
server protocol handler
--- which is contained with the BBCiPlayer plugin. It is intended for use with
BBC live and listen
--- again streams as supported by the BBCiPlayer plugin.
---
--- (c) Triode, 2009, [email protected]
+-- It provides direct streaming for rtmp and is primarily designed to be used
with the BBCRadio applet or BBCiPlayer plugin
+--
+-- (c) Adrian Smith (Triode), 2009, 2010, 2011, [email protected]
--
-- The implementation (api v2) here contains both the low level state machine
for processing the rtmp protocol and
-- serialisation code for generating amf0 request objects. Parsing of amf0
responses is not implemented here and requires
-- server support.
--
--- It makes the following assumptions:
+-- The protocol state machine and all packet processing is now implemented in
C to improve performance and resource demands.
+-- The remaining lua code is used to create serialised rtmp request packets
which are passed to the C protocol implementation.
+--
+-- The implementation makes the following assumptions:
--
-- 1) streams use a streamingId of 1 (it ignores the streamingId inside the
amf0 _result reponse to a createStream message)
-- 2) only implements single byte chunk headers (chunk id < 63)
--- 3) does not implement timestamps - a value of 0 is sent in any timestamp
field sent to the rtmp server
+-- 3) timestamps are not send in any packets sent to the server (they are
always set to 0)
--
-- Due to the way the stream object is created it is necessary to switch
methods in the stream object's meta table
-- so that the Rtmp read and write methods here are used (this is done in
Playback.lua)
+--
local string, table, math, pairs, type = string, table, math, pairs, type
local Stream = require("squeezeplay.stream")
-
local mime = require("mime")
+local rtmpC = require("rtmp")
local debug = require("jive.utils.debug")
local log = require("jive.utils.log").logger("audio.decode")
@@ -33,87 +35,18 @@
local FLASH_VER = "LNX 10,0,22,87"
+-- C read method
+read = rtmpC.read
-- session params (can't be stored in the object as we reuse the streambuf
object)
-local inBuf, outBuf, state, token, inCaches, rtmpMessages
-local ackWindow, nextAck, receivedBytes
-local sendChunkSize, recvChunkSize
-local adts1, adts2, adts3, adts4, adts5, adts6, adts7
-
+rtmpMessages = {} -- not local so it can be accessed from C
local slimproto
-
-
-function init(self, slimprotoObj)
- slimproto = slimprotoObj
- -- api version 2 includes abilty to format and serialise amf objects
- slimproto:capability("Rtmp", 2)
-end
-
-
-local function unpackNumber(str, pos, len, le)
- local v = 0
- if le then
- for i = pos + len - 1, pos, - 1 do
- v = (v << 8) | (string.byte(str, i) or 0)
- end
- else
- for i = pos, pos + len - 1 do
- v = (v << 8) | (string.byte(str, i) or 0)
- end
- end
- return v
-end
-
-
-local function packNumber(v, len, le)
- local t = {}
- for i = 1, len do
- t[#t + 1] = string.char(v & 0xFF)
- v = v >> 8
- end
- local str = table.concat(t)
- return le and str or string.reverse(str)
-end
-
-
-local function changeState(newstate)
- log:info(state, " -> ", newstate)
- state = newstate
-end
-
-
-function sendRtmp(stream, rtmp)
- -- send rtmp packets fragmenting if neccessary
- -- assume all packets have a t0 header (no header compression)
- local header = string.sub(rtmp, 1, 12)
- local body = string.sub(rtmp, 13)
-
- stream:_streamWrite(nil, header)
-
- len = string.len(body)
-
- while len > 0 do
- if len > sendChunkSize then
- len = sendChunkSize
- end
- local chunk = string.sub(body, 1, len)
- body = string.sub(body, len + 1)
-
- stream:_streamWrite(nil, chunk)
- len = string.len(body)
-
- if len > 0 then
- stream:_streamWrite(nil, string.char(
string.byte(header, 1) | 0xc0 ) )
- end
- end
-end
function write(stream, playback, header)
-- initialise
- inBuf, outBuf, token, inCaches, rtmpMessages, sendChunkSize,
recvChunkSize = "", "", "", {}, {}, 128, 128
- ackWindow, nextAck, receivedBytes = 10240, 10240, 0
- state = "reset"
+ rtmpMessages = {}
+ slimproto = playback.slimproto
-- extract the pre built rtmp packets or params within the header
for k, v in string.gmatch(header, "(%w+)=([a-zA-Z0-9%/%+]+%=*)&") do
@@ -130,517 +63,42 @@
end
end
- -- create the handshake token
- for i = 1, 1528 do
- token = token .. string.char(math.random(0,255))
- end
-
- -- send RTMP handshake
- -- c0
- local c0 = string.char(0x03)
-
- -- c1 [ assume timestamp of 0 ]
- local c1 = string.char(0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00)
.. token
-
- changeState("hsAwaitS0")
-
- return stream:_streamWrite(nil, c0 .. c1)
-end
-
-
-local hsHanders = {
-
- hsAwaitS0 = { 1,
- function(stream, s0)
- if string.byte(s0, 1) == 0x03 then
- return 'hsAwaitS1'
- else
- log:warn("did not get S0
response")
- return nil
- end
- end
- },
-
- hsAwaitS1 = { 1536,
- function(stream, s1)
- local time1 = string.sub(s1, 1, 4)
- local mytime = string.char(0x00,
0x00, 0x00, 0x00)
- local c2 = time1 .. mytime ..
string.sub(s1, 9)
- stream:_streamWrite(nil, c2)
- return 'hsAwaitS2'
- end
- },
-
- hsAwaitS2 = { 1536,
- function(stream, s2)
- local rand = string.sub(s2, 9)
- if rand == token then
- sendRtmp(stream,
rtmpMessages["connect"])
- return 'sentConnect'
- else
- return nil
- end
- end
- },
-}
-
-local rtmpHandlers = {
-
- [1] = function(stream, rtmp)
- recvChunkSize = unpackNumber(rtmp["body"], 1, 4)
- log:info("message type 1 - set recv chunk size to ",
recvChunkSize)
- end,
-
- [2] = function(stream, rtmp)
- log:info("message type 2 - abort for chunk channel
", rtmp["chan"])
- inCaches[ rtmp["chan"] ] = {}
- end,
-
- [3] = function(stream, rtmp)
- log:info("ack received")
- end,
-
- [4] = function(stream, rtmp)
- local event = unpackNumber(rtmp["body"], 1, 2)
- local data = string.sub(rtmp["body"], 3)
-
- if event == 0 then
- log:info("message type 4 - user control
message ", event, ": Stream Begin")
- elseif event == 1 then
- log:info("message type 4 - user control
message ", event, ": EOF - exiting")
- return false, true
- elseif event == 2 then
- log:info("message type 4 - user control
message ", event, ": Stream Dry")
- elseif event == 4 then
- log:info("message type 4 - user control
message ", event, ": Stream Is Recorded")
- elseif event == 6 then
- log:info("message type 4 - user control
message ", event, ": Ping Request - sending response")
- sendRtmp(stream,
- string.char(0x02,
-- chan 2, format 0
-
0x00, 0x00, 0x00, -- timestamp (not implemented)
-
0x00, 0x00, 0x06, -- length [data should be 4 bytes]
-
0x04, -- type 0x04
-
0x00, 0x00, 0x00, 0x00,-- streamId 0
-
0x00, 0x07) .. -- event type 7
- data)
-- return data
-
- else
- log:debug("message type 4 - user control
message ", event, ": ignored")
- end
- end,
-
- [5] = function(stream, rtmp)
- local window = unpackNumber(rtmp["body"], 1, 4)
- log:info("message type 5 - window ack size: ",
window, " - ignored")
- end,
-
- [6] = function(stream, rtmp)
- local window = unpackNumber(rtmp["body"], 1, 4)
- local limit = unpackNumber(rtmp["body"], 5, 1)
-
- log:info("message type 5 - set peer BW: ", window, "
limit type ", limit, " - sending response")
- ackWindow = window / 2
- sendRtmp(stream,
- string.char(0x02,
-- chan 2, format 0
- 0x00,
0x00, 0x00, -- timestamp (not implemented)
- 0x00,
0x00, 0x04, -- length
- 0x05,
-- type 0x05
- 0x00,
0x00, 0x00, 0x00, -- streamId 0
- (window
& 0xFF000000) >> 24,-- window as u32 be
- (window
& 0x00FF0000) >> 16,
- (window
& 0x0000FF00) >> 8,
- (window
& 0x000000FF)
- ))
- end,
-
- [8] = function(stream, rtmp)
- local byte1, byte2 = string.byte(rtmp["body"], 1, 2)
- if not byte1 then
- return 0
- end
-
- if byte1 == 0xAF and byte2 == 0x01 then
- -- AAC
- --log:debug("message type 8 - AAC audiodata,
len:", rtmp["length"])
- local framesize = rtmp["length"] - 2 + 7
- local header = string.char(adts1, adts2,
adts3,
-
adts4 | ((framesize >> 11) & 0x03),
-
adts5 | ((framesize >> 3) & 0xFF),
-
adts6 | ((framesize << 5) & 0xE0),
-
adts7)
-
- outBuf = outBuf .. header ..
string.sub(rtmp["body"], 3)
-
- elseif byte1 == 0xAF and byte2 == 0x00 then
- -- AAC Config
- local firstword = unpackNumber(rtmp["body"],
1, 4)
- local profile = 1
- local sr_index = (firstword & 0x00000780)
>> 7
- local channels = (firstword & 0x00000078)
>> 3
-
- log:info("message type 8 - AAC config,
profile: ", profile, " sr_index: ", sr_index, " channels: ", channels)
-
- adts1 = 0xFF
- adts2 = 0xF9
- adts3 = ((profile << 6) & 0xC0) | ((sr_index
<< 2) & 0x3C) | ((channels >> 2) & 0x1)
- adts4 = ((channels << 6) & 0xC0)
- adts5 = 0x00
- adts6 = ((0x7FF >> 6) & 0x1F)
- adts7 = ((0x7FF << 2) & 0xFC)
-
- elseif (byte1 & 0xF0) == 0x20 then
- -- MP3
- --log:debug("message type 8 - MP3 audiodata,
len:", rtmp["length"])
-
- outBuf = outBuf .. string.sub(rtmp["body"],
2)
- end
-
- if state ~= "Playing" then
- if state ~= "Buffering" then
- if rtmpMessages["meta"] ~= nil then
- slimproto:send({ opcode =
"RESP", headers = "" })
- end
- changeState("Buffering")
- end
- -- don't start playing live streams
immediately as it causes stutter
- if rtmpMessages["subscribe"] and
rtmp["timestamp"] < 4500 then
- return 0
- else
- changeState("Playing")
- end
- end
-
- local n = stream:feedFromLua(outBuf)
- if n > 0 then
- outBuf = string.sub(outBuf, n + 1)
- end
-
- return n
- end,
-
- [18] = function(stream, rtmp)
- log:info("message type 18 - metadata")
-
- if rtmpMessages["meta"] == nil or
rtmpMessages["meta"] == "send" then
- slimproto:send({ opcode = "META", data =
rtmp["body"] })
- end
- end,
-
- [20] = function(stream, rtmp)
- log:info("message type 20")
-
- if rtmpMessages["meta"] == nil or
rtmpMessages["meta"] == "send" then
- slimproto:send({ opcode = "META", data =
rtmp["body"] })
- end
-
- if string.match(rtmp["body"], "_result") then
-
- if state == "sentConnect" then
-
- log:info("sending createStream")
- sendRtmp(stream,
rtmpMessages["create"])
- changeState("sentCreateStream")
-
- elseif state == "sentCreateStream" then
-
- if rtmpMessages["subscribe"] then
- log:info("sending
FCSubscribe")
- sendRtmp(stream,
rtmpMessages["subscribe"])
-
changeState("sentFCSubscribe")
- else
- log:info("sending play")
- sendRtmp(stream,
rtmpMessages["play"])
- changeState("sentPlay")
- end
- end
-
- elseif string.match(rtmp["body"], "_error") then
-
- log:warn("stream error")
- return nil, true
-
- elseif string.match(rtmp["body"], "onFCSubscribe")
then
-
- log:info("sending play")
- sendRtmp(stream, rtmpMessages["play"])
- changeState("sentPlay")
-
- elseif string.match(rtmp["body"], "onStatus") then
-
- log:info("onStatus")
-
- local error =
- string.match(rtmp["body"],
"NetStream%.Failed") or
- string.match(rtmp["body"],
"NetStream%.Play%.Failed") or
- string.match(rtmp["body"],
"NetStream%.Play%.StreamNotFound") or
- string.match(rtmp["body"],
"NetConnection%.Connect%.InvalidApp") or
- string.match(rtmp["body"],
"NetStream%.Play%.Complete") or
- string.match(rtmp["body"],
"NetStream%.Play%.Stop")
-
- if error then
- return nil, true
- end
-
- end
-
- end,
-}
-
-
-function read(stream)
-
- local readmore = true
- local n = 0
-
- while readmore do
-
- -- read new data, contraining the size of our input buffer
- -- without this check, fast servers can send us too much data
causing OOM conditions
- if string.len(inBuf) < 8192 then
-
- local new, error = stream:readToLua()
- if error then
- stream:setStreaming(false)
- stream:disconnect()
- return
- end
- if new then
- inBuf = inBuf .. new
- receivedBytes = receivedBytes + string.len(new)
- end
-
- end
-
- readmore = false
-
- local len = string.len(inBuf)
-
- -- handshake phase
- if (state == 'hsAwaitS0' or state == 'hsAwaitS1' or state ==
'hsAwaitS2') then
-
- local expect = hsHanders[state][1]
- local handler = hsHanders[state][2]
-
- if len >= expect then
-
- local packet = string.sub(inBuf, 1, expect)
- inBuf = string.sub(inBuf, expect + 1)
-
- local newstate = handler(stream, packet)
- if not newstate then
- stream:setStreaming(false)
- stream:disconnect()
- return
- end
-
- changeState(newstate)
-
- readmore = true
-
- end
-
- -- rtmp parsing phase
- elseif len > 0 then
-
- local header0 = string.byte(inBuf, 1)
- local chan = header0 & 0x3f
- local fmt = (header0 & 0xc0) >> 6
- local info, body
-
- if not inCaches[chan] then
- inCaches[chan] = {}
- end
- local inCache = inCaches[chan]
-
- if (chan == 0 or chan == 1) then
- log:error("rtmp chan > 63 - not supported")
- stream:setStreaming(false)
- stream:disconnect()
- return
- end
-
- if fmt == 0 and len >= 12 then
-
- local t0len = unpackNumber(inBuf, 5, 3)
- local read = math.min(t0len, recvChunkSize) + 12
-
- if len >= read then
-
- info = {
- chunkChan = chan,
- type = string.byte(inBuf,
8),
- timestamp = unpackNumber(inBuf,
2, 3),
- length = t0len,
- streamId = unpackNumber(inBuf,
9, 4, true)
- }
-
- local frag = string.sub(inBuf, 13, read)
- inBuf = string.sub(inBuf, read + 1)
-
- if read == t0len + 12 then
- body = frag
- else
- inCaches[chan] = {
- info = info,
- body = frag,
- remain = t0len + 12 -
read
- }
- inCache = inCache[chan]
- end
-
- readmore = true
- end
-
- elseif fmt == 1 and len >= 8 and inCache["info"] then
-
- local t1len = unpackNumber(inBuf, 5, 3)
- local read = math.min(t1len, recvChunkSize) + 8
-
- if len >= read then
-
- local delta = unpackNumber(inBuf, 2, 3)
-
- info = inCache["info"]
- info["type"] = string.byte(inBuf, 8)
- info["timestamp"] = info["timestamp"] +
delta
- info["length"] = t1len
- -- streamId is cached
- info["delta"] = delta
-
- local frag = string.sub(inBuf, 9, read)
- inBuf = string.sub(inBuf, read + 1)
-
- if read == t1len + 8 then
- body = frag
- else
- inCache["body"] = frag
- inCache["remain"] = t1len + 8 -
read
- end
-
- readmore = true
- end
-
- elseif fmt == 2 and inCache["info"] then
-
- local t2len = inCache["info"]["length"]
- local read = math.min(t2len, recvChunkSize) + 4
-
- if len >= read then
-
- local delta = unpackNumber(inBuf, 2, 3)
-
- info = inCache["info"]
- info["timestamp"] = info["timestamp"] +
delta
- -- type, length, streamId is cached
- info["delta"] = delta
-
- local frag = string.sub(inBuf, 5, read)
- inBuf = string.sub(inBuf, read + 1)
-
- if read == t2len + 4 then
- body = frag
- else
- inCache["body"] = frag
- inCache["remain"] = t2len + 4 -
read
- end
-
- readmore = true
- end
-
- elseif fmt == 3 and inCache["remain"] and
inCache["remain"] > 0 then
-
- local read = math.min(inCache["remain"],
recvChunkSize) + 1
-
- if len >= read then
-
- local frag = string.sub(inBuf, 2, read)
- inBuf = string.sub(inBuf, read + 1)
-
- inCache["body"] = inCache["body"] ..
frag
- inCache["remain"] = inCache["remain"] -
(read - 1)
-
- if inCache["remain"] == 0 then
- info = inCache["info"]
- body = inCache["body"]
- end
-
- readmore = true
- end
-
- elseif fmt == 3 and inCache["info"] then
-
- local t3len = inCache["info"]["length"]
- local read = math.min(t3len, recvChunkSize) + 1
-
- if len >= read then
-
- info = inCache["info"]
- info["timestamp"] = info["timestamp"] +
info["delta"]
- -- type, length, streamId is cached
-
- local frag = string.sub(inBuf, 2, read)
- inBuf = string.sub(inBuf, read + 1)
-
- if read == t3len + 1 then
- body = frag
- else
- inCache["body"] = frag
- inCache["remain"] = t3len + 1 -
read
- end
-
- readmore = true
- end
-
- end
-
- if body then
- local rtmp = info
- inCache["info"] = info
- rtmp["body"] = body
-
- local handler = rtmpHandlers[ rtmp["type"] ]
- if handler then
- local ret, error = handler(stream, rtmp)
- if error then
- stream:setStreaming(false)
- stream:disconnect()
- return ret
- end
- if ret then
- n = n + ret
- end
- else
- log:warn("unhandled rtmp packet, type:
", info["type"])
- end
- end
-
- end
-
- if receivedBytes > nextAck then
- log:info("sending ack")
- sendRtmp(stream,
- string.char(0x02,
-- chan 2, format 0
- 0x00, 0x00,
0x00, -- timestamp (not implemented)
- 0x00, 0x00,
0x04, -- length
- 0x03,
-- type 0x03
- 0x00, 0x00,
0x00, 0x00, -- streamId 0
- (receivedBytes
& 0xFF000000) >> 24,
- (receivedBytes
& 0x00FF0000) >> 16,
- (receivedBytes
& 0x0000FF00) >> 8,
- (receivedBytes
& 0x000000FF)
- ))
- nextAck = nextAck + ackWindow
- end
-
- end
-
- return n
-
-end
-
-
+ -- start the rtmp protocol by sending the handshake (implemented in C)
+ return rtmpC.sendHandshake(stream);
+end
+
+
+-- called from C to send RESP message back to server
+function streamStartEvent()
+ if rtmpMessages["meta"] ~= nil then
+ slimproto:send({ opcode = "RESP", headers = "" })
+ end
+end
+
+
+-- called from C to send Metadata back to server
+function sendMeta(msg)
+ if rtmpMessages["meta"] == nil or rtmpMessages["meta"] == "send" then
+ slimproto:send({ opcode = "META", data = msg })
+ end
+end
+
+
+---------------------------------------------------------------------------------------------------------------------
-- amf packet formatting and serialisation code
-- this is added for api version 2 so we don't rely on server code to generate
serialsed amf0
+---------------------------------------------------------------------------------------------------------------------
+
+local function packNumber(v, len, le)
+ local t = {}
+ for i = 1, len do
+ t[#t + 1] = string.char(v & 0xFF)
+ v = v >> 8
+ end
+ local str = table.concat(t)
+ return le and str or string.reverse(str)
+end
+
-- emulate pack "d" to serialise numbers as doubles
-- this only implements most signficant 28 bits of the mantissa, rest are 0
@@ -715,7 +173,7 @@
function formatRtmp(chan, type, streamId, body)
return
string.char(chan & 0x7f) .. -- chan X, format 0
- string.char(0x00, 0x00, 0x00) .. -- timestamp (not
implemented)
+ string.char(0x00, 0x00, 0x00) .. -- timestamp
packNumber(string.len(body), 3) .. -- length
string.char(type & 0xff) .. -- type
packNumber(streamId, 4, true) .. -- streamId
Modified: 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h
URL:
http://svn.slimdevices.com/jive/7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h?rev=9339&r1=9338&r2=9339&view=diff
==============================================================================
--- 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h (original)
+++ 7.6/trunk/squeezeplay/src/squeezeplay/src/audio/streambuf.h Sat Mar 5
03:04:13 2011
@@ -37,6 +37,8 @@
extern void streambuf_flush(void);
+extern void streambuf_feed(u8_t *buf, size_t size);
+
/* the mutex should be locked when using fast read */
extern size_t streambuf_fast_read(u8_t *buf, size_t min, size_t max, bool_t
*streaming);
_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/mailman/listinfo/jive-checkins