Author: adrian
Date: Thu Nov 17 12:45:50 2011
New Revision: 9555
URL: http://svn.slimdevices.com/jive?rev=9555&view=rev
Log:
Bug: 17760
Description: treat rtmp stream end differently from stream errors and reset
streaming state so that stream end causes the server to move onto the next track
Modified:
7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
Modified: 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
URL:
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c?rev=9555&r1=9554&r2=9555&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c (original)
+++ 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c Thu Nov 17 12:45:50
2011
@@ -47,6 +47,8 @@
#define INCACHE_SLOTS 8 // slots for caching state for concurrent active
rtmp chunk channels - set to be higher than seen, which is 5
#define BUFFER_UNTIL_TS 4500 // on stream start only transition to playing
state after this timestamp, avoids startup rebuffer
+
+typedef enum { STREAM_OK, STREAM_END, STREAM_ERROR } stream_status;
typedef enum {
RTMP_IDLE = 0, RTMP_AWAIT_S0 = 1, RTMP_AWAIT_S1 = 2, RTMP_AWAIT_S2 = 3,
@@ -230,36 +232,36 @@
// rtmp packet handlers - return false for error to force stream close
// receive chunk size handler
-bool messageType1(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType1(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
recv_chunksize = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
LOG_INFO(log_audio_decode, "message type 1 - set recv chunk size to:
%d", recv_chunksize);
- return true;
+ return STREAM_OK;
}
// abort channel handler
-bool messageType2(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType2(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
LOG_INFO(log_audio_decode, "message type 2 - abort for chunk channel:
%d", entry->chan);
if (entry->buf) {
free(entry->buf);
}
memset(entry, 0, sizeof(struct incache_entry));
- return true;
+ return STREAM_OK;
}
// ack received handler
-bool messageType3(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType3(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
LOG_INFO(log_audio_decode, "message type 3 - ack received");
- return true;
+ return STREAM_OK;
}
// user control message handler
-bool messageType4(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType4(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
unsigned event = *(buf) << 8 | *(buf+1);
u8_t *data = buf + 2;
switch (event) {
case 0: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 0: Stream Begin"); break;
case 1: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 1: EOF - exiting");
- return false;
+ return STREAM_END;
break;
case 2: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 2: Stream Dry"); break;
case 4: LOG_INFO(log_audio_decode, "message type 4 - user control
message event 4: Stream Is Recorded"); break;
@@ -287,18 +289,18 @@
default: LOG_DEBUG(log_audio_decode, "message type 4 - user control
message event %d: ignored", event);
}
- return true;
+ return STREAM_OK;
}
// window ack size handler
-bool messageType5(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType5(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
LOG_INFO(log_audio_decode, "message type 5 - window ack size: %d -
ignored", window);
- return true;
+ return STREAM_OK;
}
// set window size handler
-bool messageType6(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType6(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
struct stream *stream = lua_touserdata(L, 1);
unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 |
*(buf+3);
unsigned limit = *(buf+4);
@@ -321,11 +323,11 @@
send_rtmp(stream->fd, packet, 16);
ack_wind = window / 2;
- return true;
+ return STREAM_OK;
}
// audio packet handler
-bool messageType8(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType8(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
int n = streambuf_get_freebytes();
if (*buf == 0xAF) {
@@ -345,7 +347,7 @@
streambuf_feed(buf + 2, entry->len - 2);
} else {
LOG_ERROR(log_audio_decode, "panic - not enough
space in streambuf - case not handled by implementation");
- return false;
+ return STREAM_ERROR;
}
} else if (*(buf + 1) == 0x00) {
@@ -370,7 +372,7 @@
streambuf_feed(buf + 1, entry->len - 1);
} else {
LOG_ERROR(log_audio_decode, "panic - not enough space
in streambuf - case not handled by implementation");
- return false;
+ return STREAM_ERROR;
}
}
@@ -403,11 +405,11 @@
}
}
- return true;
+ return STREAM_OK;
}
// metadata handler
-bool messageType18(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType18(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
LOG_INFO(log_audio_decode, "message type 18 - meta");
// send to server for debug
@@ -419,7 +421,7 @@
if (lua_pcall(L, 1, 0, 0) != 0) {
fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L,
-1));
}
- return true;
+ return STREAM_OK;
}
// helper for messageType20 which returns true if string exists anywhere
within buf
@@ -440,7 +442,7 @@
}
// message type 20
-bool messageType20(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType20(lua_State *L, u8_t *buf, struct incache_entry
*entry) {
LOG_INFO(log_audio_decode, "message type 20");
// send packet to server for debug
@@ -481,7 +483,7 @@
} else if (bufmatch(buf, entry->len, "_error")) {
LOG_WARN(log_audio_decode, "stream error");
- return false;
+ return STREAM_ERROR;
} else if (bufmatch(buf, entry->len, "onFCSubscribe")) {
@@ -492,21 +494,27 @@
} else if (bufmatch(buf, entry->len, "onStatus")) {
LOG_INFO(log_audio_decode, "onStatus");
+
+ if (bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
+ bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+
+ LOG_INFO(log_audio_decode, "stream ended - closing
stream");
+
+ return STREAM_END;
+ }
if (bufmatch(buf, entry->len, "NetStream.Failed") ||
bufmatch(buf, entry->len, "NetStream.Play.Failed") ||
bufmatch(buf, entry->len,
"NetStream.Play.StreamNotFound") ||
- bufmatch(buf, entry->len,
"NetConnection.Connect.InvalidApp") ||
- bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
- bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+ bufmatch(buf, entry->len,
"NetConnection.Connect.InvalidApp")) {
LOG_WARN(log_audio_decode, "error status received -
closing stream");
- return false;
+ return STREAM_ERROR;
}
}
- return true;
+ return STREAM_OK;
}
int readL(lua_State *L) {
@@ -547,6 +555,7 @@
} else {
LOG_ERROR(log_audio_decode, "socket closed,
%s", strerror(SOCKETERROR));
CLOSESOCKET(stream->fd);
+ streambuf_set_streaming(FALSE);
lua_pushnil(L);
lua_pushstring(L, strerror(SOCKETERROR));
return 2;
@@ -585,6 +594,7 @@
} else {
LOG_ERROR(log_audio_decode, "bad
handshake token");
CLOSESOCKET(stream->fd);
+ streambuf_set_streaming(FALSE);
lua_pushnil(L);
lua_pushstring(L, "bad handshake
token");
return 2;
@@ -735,28 +745,38 @@
if (dpos || reasembled) {
u8_t *buf = dpos ? dpos : entry->buf;
- bool ok = true;
+ stream_status status;
switch(entry->type) {
- case 1: ok = messageType1(L, buf, entry);
break;
- case 2: ok = messageType2(L, buf, entry);
break;
- case 3: ok = messageType3(L, buf, entry);
break;
- case 4: ok = messageType4(L, buf, entry);
break;
- case 5: ok = messageType5(L, buf, entry);
break;
- case 6: ok = messageType6(L, buf, entry);
break;
- case 8: ok = messageType8(L, buf, entry);
break;
- case 18: ok = messageType18(L, buf, entry);
break;
- case 20: ok = messageType20(L, buf, entry);
break;
+ case 1: status = messageType1(L, buf, entry);
break;
+ case 2: status = messageType2(L, buf, entry);
break;
+ case 3: status = messageType3(L, buf, entry);
break;
+ case 4: status = messageType4(L, buf, entry);
break;
+ case 5: status = messageType5(L, buf, entry);
break;
+ case 6: status = messageType6(L, buf, entry);
break;
+ case 8: status = messageType8(L, buf, entry);
break;
+ case 18: status = messageType18(L, buf, entry);
break;
+ case 20: status = messageType20(L, buf, entry);
break;
default:
+ status = STREAM_OK;
LOG_DEBUG(log_audio_decode, "unhandled
rtmp packet type: %d", entry->type);
}
- if (!ok) {
- LOG_ERROR(log_audio_decode, "handler
returned false - closing stream");
+ if (status != STREAM_OK) {
+
CLOSESOCKET(stream->fd);
- lua_pushnil(L);
- lua_pushstring(L, "handler returned
false - closing stream");
- return 2;
+ streambuf_set_streaming(FALSE);
+
+ if (status == STREAM_END) {
+ LOG_INFO(log_audio_decode, "end
of stream");
+ lua_pushboolean(L, FALSE);
+ return 1;
+ } else if (status == STREAM_ERROR) {
+ LOG_WARN(log_audio_decode,
"handler returned false - closing stream");
+ lua_pushnil(L);
+ lua_pushstring(L, "stream error
- closing stream");
+ return 2;
+ }
}
}
}
_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/mailman/listinfo/jive-checkins