Author: adrian
Date: Thu Nov 17 12:45:50 2011
New Revision: 9555

URL: http://svn.slimdevices.com/jive?rev=9555&view=rev
Log:
Bug: 17760
Description: treat rtmp stream end differently from stream errors and reset 
streaming state so that stream end causes the server to move onto the next track

Modified:
    7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c

Modified: 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c
URL: 
http://svn.slimdevices.com/jive/7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c?rev=9555&r1=9554&r2=9555&view=diff
==============================================================================
--- 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c (original)
+++ 7.7/trunk/squeezeplay/src/luartmp-squeezeplay/rtmp.c Thu Nov 17 12:45:50 
2011
@@ -47,6 +47,8 @@
 
 #define INCACHE_SLOTS      8 // slots for caching state for concurrent active 
rtmp chunk channels - set to be higher than seen, which is 5
 #define BUFFER_UNTIL_TS 4500 // on stream start only transition to playing 
state after this timestamp, avoids startup rebuffer
+
+typedef enum { STREAM_OK, STREAM_END, STREAM_ERROR } stream_status; 
 
 typedef enum {
        RTMP_IDLE = 0, RTMP_AWAIT_S0 = 1, RTMP_AWAIT_S1 = 2, RTMP_AWAIT_S2 = 3, 
@@ -230,36 +232,36 @@
 // rtmp packet handlers - return false for error to force stream close
 
 // receive chunk size handler
-bool messageType1(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType1(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        recv_chunksize = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
        LOG_INFO(log_audio_decode, "message type 1 - set recv chunk size to: 
%d", recv_chunksize);
-       return true;
+       return STREAM_OK;
 }
 
 // abort channel handler
-bool messageType2(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType2(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        LOG_INFO(log_audio_decode, "message type 2 - abort for chunk channel: 
%d", entry->chan);
        if (entry->buf) {
                free(entry->buf);
        }
        memset(entry, 0, sizeof(struct incache_entry));
-       return true;
+       return STREAM_OK;
 }
 
 // ack received handler
-bool messageType3(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType3(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        LOG_INFO(log_audio_decode, "message type 3 - ack received");
-       return true;
+       return STREAM_OK;
 }
 
 // user control message handler
-bool messageType4(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType4(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        unsigned event = *(buf) << 8 | *(buf+1);
        u8_t *data = buf + 2;
        switch (event) {
        case 0: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 0: Stream Begin"); break;
        case 1: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 1: EOF - exiting"); 
-               return false;
+               return STREAM_END;
                break;
        case 2: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 2: Stream Dry"); break;
        case 4: LOG_INFO(log_audio_decode, "message type 4 - user control 
message event 4: Stream Is Recorded"); break;
@@ -287,18 +289,18 @@
 
        default: LOG_DEBUG(log_audio_decode, "message type 4 - user control 
message event %d: ignored", event);
        }
-       return true;
+       return STREAM_OK;
 }
 
 // window ack size handler
-bool messageType5(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType5(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
        LOG_INFO(log_audio_decode, "message type 5 - window ack size: %d - 
ignored", window);
-       return true;
+       return STREAM_OK;
 }
 
 // set window size handler
-bool messageType6(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType6(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        struct stream *stream = lua_touserdata(L, 1);
        unsigned window = *(buf) << 24 |*(buf+1) << 16 | *(buf+2) << 8 | 
*(buf+3);
        unsigned limit  = *(buf+4);
@@ -321,11 +323,11 @@
        send_rtmp(stream->fd, packet, 16);
 
        ack_wind = window / 2;
-       return true;
+       return STREAM_OK;
 }
 
 // audio packet handler
-bool messageType8(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType8(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        int n = streambuf_get_freebytes();
        
        if (*buf == 0xAF) {
@@ -345,7 +347,7 @@
                                streambuf_feed(buf + 2, entry->len - 2);
                        } else {
                                LOG_ERROR(log_audio_decode, "panic - not enough 
space in streambuf - case not handled by implementation");
-                               return false;
+                               return STREAM_ERROR;
                        }
                        
                } else if (*(buf + 1) == 0x00) {
@@ -370,7 +372,7 @@
                        streambuf_feed(buf + 1, entry->len - 1);
                } else {
                        LOG_ERROR(log_audio_decode, "panic - not enough space 
in streambuf - case not handled by implementation");
-                       return false;
+                       return STREAM_ERROR;
                }
        }
        
@@ -403,11 +405,11 @@
                }
        }
 
-       return true;
+       return STREAM_OK;
 }
 
 // metadata handler
-bool messageType18(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType18(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        LOG_INFO(log_audio_decode, "message type 18 - meta");
 
        // send to server for debug
@@ -419,7 +421,7 @@
        if (lua_pcall(L, 1, 0, 0) != 0) {
                fprintf(stderr, "error running sendMeta: %s\n", lua_tostring(L, 
-1));
        }
-       return true;
+       return STREAM_OK;
 }
 
 // helper for messageType20 which returns true if string exists anywhere 
within buf
@@ -440,7 +442,7 @@
 }
 
 // message type 20
-bool messageType20(lua_State *L, u8_t *buf, struct incache_entry *entry) {
+stream_status messageType20(lua_State *L, u8_t *buf, struct incache_entry 
*entry) {
        LOG_INFO(log_audio_decode, "message type 20");
 
        // send packet to server for debug
@@ -481,7 +483,7 @@
        } else if (bufmatch(buf, entry->len, "_error")) {
 
                LOG_WARN(log_audio_decode, "stream error");
-               return false;
+               return STREAM_ERROR;
                
        } else if (bufmatch(buf, entry->len, "onFCSubscribe")) {
                
@@ -492,21 +494,27 @@
        } else if (bufmatch(buf, entry->len, "onStatus")) {
                
                LOG_INFO(log_audio_decode, "onStatus");
+
+               if (bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
+                       bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+                       
+                       LOG_INFO(log_audio_decode, "stream ended - closing 
stream");
+                       
+                       return STREAM_END;
+               }       
                
                if (bufmatch(buf, entry->len, "NetStream.Failed") ||
                        bufmatch(buf, entry->len, "NetStream.Play.Failed") ||
                        bufmatch(buf, entry->len, 
"NetStream.Play.StreamNotFound") ||
-                       bufmatch(buf, entry->len, 
"NetConnection.Connect.InvalidApp") ||
-                       bufmatch(buf, entry->len, "NetStream.Play.Complete") ||
-                       bufmatch(buf, entry->len, "NetStream.Play.Stop")) {
+                       bufmatch(buf, entry->len, 
"NetConnection.Connect.InvalidApp")) {
                        
                        LOG_WARN(log_audio_decode, "error status received - 
closing stream");
                        
-                       return false;
+                       return STREAM_ERROR;
                }       
        }
 
-       return true;
+       return STREAM_OK;
 }
 
 int readL(lua_State *L) {
@@ -547,6 +555,7 @@
                        } else {
                                LOG_ERROR(log_audio_decode, "socket closed, 
%s", strerror(SOCKETERROR));
                                CLOSESOCKET(stream->fd);
+                               streambuf_set_streaming(FALSE);
                                lua_pushnil(L);
                                lua_pushstring(L, strerror(SOCKETERROR));
                                return 2;
@@ -585,6 +594,7 @@
                                } else {
                                        LOG_ERROR(log_audio_decode, "bad 
handshake token");
                                        CLOSESOCKET(stream->fd);
+                                       streambuf_set_streaming(FALSE);
                                        lua_pushnil(L);
                                        lua_pushstring(L, "bad handshake 
token");
                                        return 2;
@@ -735,28 +745,38 @@
                        
                        if (dpos || reasembled) {
                                u8_t *buf = dpos ? dpos : entry->buf;
-                               bool ok = true;
+                               stream_status status;
 
                                switch(entry->type) {
-                               case  1: ok = messageType1(L, buf, entry); 
break;
-                               case  2: ok = messageType2(L, buf, entry); 
break;
-                               case  3: ok = messageType3(L, buf, entry); 
break;
-                               case  4: ok = messageType4(L, buf, entry); 
break;
-                               case  5: ok = messageType5(L, buf, entry); 
break;
-                               case  6: ok = messageType6(L, buf, entry); 
break;
-                               case  8: ok = messageType8(L, buf, entry); 
break;
-                               case 18: ok = messageType18(L, buf, entry); 
break;
-                               case 20: ok = messageType20(L, buf, entry); 
break;
+                               case  1: status = messageType1(L, buf, entry); 
break;
+                               case  2: status = messageType2(L, buf, entry); 
break;
+                               case  3: status = messageType3(L, buf, entry); 
break;
+                               case  4: status = messageType4(L, buf, entry); 
break;
+                               case  5: status = messageType5(L, buf, entry); 
break;
+                               case  6: status = messageType6(L, buf, entry); 
break;
+                               case  8: status = messageType8(L, buf, entry); 
break;
+                               case 18: status = messageType18(L, buf, entry); 
break;
+                               case 20: status = messageType20(L, buf, entry); 
break;
                                default:
+                                       status = STREAM_OK;
                                        LOG_DEBUG(log_audio_decode, "unhandled 
rtmp packet type: %d", entry->type);
                                }
 
-                               if (!ok) {
-                                       LOG_ERROR(log_audio_decode, "handler 
returned false - closing stream");
+                               if (status != STREAM_OK) {
+
                                        CLOSESOCKET(stream->fd);
-                                       lua_pushnil(L);
-                                       lua_pushstring(L, "handler returned 
false - closing stream");
-                                       return 2;
+                                       streambuf_set_streaming(FALSE);
+
+                                       if (status == STREAM_END) {
+                                               LOG_INFO(log_audio_decode, "end 
of stream");
+                                               lua_pushboolean(L, FALSE);
+                                               return 1;
+                                       } else if (status == STREAM_ERROR) {
+                                               LOG_WARN(log_audio_decode, 
"handler returned false - closing stream");
+                                               lua_pushnil(L);
+                                               lua_pushstring(L, "stream error 
- closing stream");
+                                               return 2;
+                                       }
                                }
                        }
                }

_______________________________________________
Jive-checkins mailing list
[email protected]
http://lists.slimdevices.com/mailman/listinfo/jive-checkins

Reply via email to