---
 libavformat/rtmpproto.c | 285 ++++++++++++++++++++++++++----------------------
 1 file changed, 153 insertions(+), 132 deletions(-)

diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index 19b6874..9659e4f 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -52,15 +52,17 @@
 typedef enum {
     STATE_START,      ///< client has not done anything yet
     STATE_HANDSHAKED, ///< client has performed handshake
-    STATE_RELEASING,  ///< client releasing stream before publish it (for 
output)
     STATE_FCPUBLISH,  ///< client FCPublishing stream (for output)
-    STATE_CONNECTING, ///< client connected to server successfully
-    STATE_READY,      ///< client has sent all needed commands and waits for 
server reply
     STATE_PLAYING,    ///< client has started receiving multimedia data from 
server
     STATE_PUBLISHING, ///< client has started sending multimedia data to 
server (for output)
     STATE_STOPPED,    ///< the broadcast has been stopped
 } ClientState;
 
+typedef struct TrackedMethod {
+    char *name;
+    int id;
+} TrackedMethod;
+
 /** protocol handler context */
 typedef struct RTMPContext {
     const AVClass *class;
@@ -86,7 +88,6 @@ typedef struct RTMPContext {
     uint8_t       flv_header[11];             ///< partial incoming flv packet 
header
     int           flv_header_bytes;           ///< number of initialized bytes 
in flv_header
     int           nb_invokes;                 ///< keeps track of invoke 
messages
-    int           create_stream_invoke;       ///< invoke id for the create 
stream command
     char*         tcurl;                      ///< url of the target stream
     char*         flashver;                   ///< version of the flash plugin
     char*         swfurl;                     ///< url of the swf player
@@ -96,6 +97,9 @@ typedef struct RTMPContext {
     int           client_buffer_time;         ///< client buffer time in ms
     int           flush_interval;             ///< number of packets flushed 
in the same request (RTMPT only)
     int           encrypted;                  ///< use an encrypted connection 
(RTMPE only)
+    TrackedMethod*tracked_methods;            ///< tracked methods buffer
+    int           nb_tracked_methods;         ///< number of tracked methods
+    int           tracked_methods_size;       ///< size of the tracked methods 
buffer
 } RTMPContext;
 
 #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used for 
first client digest signing
@@ -121,6 +125,69 @@ static const uint8_t rtmp_server_key[] = {
     0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
 };
 
+static int add_tracked_method(RTMPContext *rt, const char *name, int id)
+{
+    void *ptr;
+
+    if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) {
+        rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2;
+        ptr = av_realloc(rt->tracked_methods,
+                         rt->tracked_methods_size * 
sizeof(*rt->tracked_methods));
+        if (!ptr)
+            return AVERROR(ENOMEM);
+        rt->tracked_methods = ptr;
+    }
+
+    rt->tracked_methods[rt->nb_tracked_methods].name = strdup(name);
+    rt->tracked_methods[rt->nb_tracked_methods++].id = id;
+
+    return 0;
+}
+
+static void del_tracked_method(RTMPContext *rt, int index)
+{
+    size_t membersize = sizeof(*rt->tracked_methods);
+    void *addr = rt->tracked_methods + membersize * index;
+    void *next = addr + membersize;
+    int nb = membersize * (rt->nb_tracked_methods - index - 1);
+
+    memmove(addr, next, nb);
+    rt->nb_tracked_methods--;
+}
+
+static void free_tracked_methods(RTMPContext *rt)
+{
+    int i;
+
+    for (i = 0; i < rt->nb_tracked_methods; i ++)
+        av_free(rt->tracked_methods[i].name);
+    av_free(rt->tracked_methods);
+}
+
+static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
+{
+    int ret;
+
+    if (pkt->type == RTMP_PT_INVOKE && track) {
+        GetByteContext gbc;
+        char name[128];
+        int len;
+
+        bytestream2_init(&gbc, pkt->data, pkt->data_size);
+        if ((ret = ff_amf_read_string(&gbc, name, sizeof(name), &len)) < 0)
+            return ret;
+
+        if ((ret = add_tracked_method(rt, name, rt->nb_invokes)) < 0)
+            return ret;
+    }
+
+    ret = ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size,
+                               rt->prev_pkt[1]);
+    ff_rtmp_packet_destroy(pkt);
+
+    return ret;
+}
+
 static int rtmp_write_amf_data(URLContext *s, char *param, uint8_t **p)
 {
     char *field, *value;
@@ -269,11 +336,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
 
     pkt.data_size = p - pkt.data;
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 1);
 }
 
 /**
@@ -297,11 +360,7 @@ static int gen_release_stream(URLContext *s, RTMPContext 
*rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -325,11 +384,7 @@ static int gen_fcpublish_stream(URLContext *s, RTMPContext 
*rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -353,11 +408,7 @@ static int gen_fcunpublish_stream(URLContext *s, 
RTMPContext *rt)
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, rt->playpath);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -380,13 +431,8 @@ static int gen_create_stream(URLContext *s, RTMPContext 
*rt)
     ff_amf_write_string(&p, "createStream");
     ff_amf_write_number(&p, ++rt->nb_invokes);
     ff_amf_write_null(&p);
-    rt->create_stream_invoke = rt->nb_invokes;
-
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
 
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 1);
 }
 
 
@@ -412,11 +458,7 @@ static int gen_delete_stream(URLContext *s, RTMPContext 
*rt)
     ff_amf_write_null(&p);
     ff_amf_write_number(&p, rt->main_channel_id);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -437,11 +479,7 @@ static int gen_buffer_time(URLContext *s, RTMPContext *rt)
     bytestream_put_be32(&p, rt->main_channel_id);
     bytestream_put_be32(&p, rt->client_buffer_time);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -469,11 +507,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
     ff_amf_write_string(&p, rt->playpath);
     ff_amf_write_number(&p, rt->live);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 1);
 }
 
 /**
@@ -500,11 +534,7 @@ static int gen_publish(URLContext *s, RTMPContext *rt)
     ff_amf_write_string(&p, rt->playpath);
     ff_amf_write_string(&p, "live");
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 1);
 }
 
 /**
@@ -529,11 +559,8 @@ static int gen_pong(URLContext *s, RTMPContext *rt, 
RTMPPacket *ppkt)
     p = pkt.data;
     bytestream_put_be16(&p, 7);
     bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
 
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -551,11 +578,8 @@ static int gen_server_bw(URLContext *s, RTMPContext *rt)
 
     p = pkt.data;
     bytestream_put_be32(&p, rt->server_bw);
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
 
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -576,11 +600,7 @@ static int gen_check_bw(URLContext *s, RTMPContext *rt)
     ff_amf_write_number(&p, ++rt->nb_invokes);
     ff_amf_write_null(&p);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 /**
@@ -598,11 +618,8 @@ static int gen_bytes_read(URLContext *s, RTMPContext *rt, 
uint32_t ts)
 
     p = pkt.data;
     bytestream_put_be32(&p, rt->bytes_read);
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
 
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 0);
 }
 
 static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
@@ -622,11 +639,7 @@ static int gen_fcsubscribe_stream(URLContext *s, 
RTMPContext *rt,
     ff_amf_write_null(&p);
     ff_amf_write_string(&p, subscribe);
 
-    ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
-                               rt->prev_pkt[1]);
-    ff_rtmp_packet_destroy(&pkt);
-
-    return ret;
+    return rtmp_send_packet(rt, &pkt, 1);
 }
 
 int ff_rtmp_calc_digest(const uint8_t *src, int len, int gap,
@@ -1021,68 +1034,77 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt)
             av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr);
         return -1;
     } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) {
-        switch (rt->state) {
-            case STATE_HANDSHAKED:
-                if (!rt->is_input) {
-                    if ((ret = gen_release_stream(s, rt)) < 0)
-                        return ret;
-                    if ((ret = gen_fcpublish_stream(s, rt)) < 0)
-                        return ret;
-                    rt->state = STATE_RELEASING;
-                } else {
-                    if ((ret = gen_server_bw(s, rt)) < 0)
-                        return ret;
-                    rt->state = STATE_CONNECTING;
-                }
-                if ((ret = gen_create_stream(s, rt)) < 0)
+        char *tracked_method = NULL;
+        GetByteContext gbc;
+        double pkt_id;
+
+        bytestream2_init(&gbc, pkt->data + 10, pkt->data_size);
+        if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0)
+            return ret;
+
+        for (i = 0; i < rt->nb_tracked_methods; i++) {
+            if (rt->tracked_methods[i].id != pkt_id)
+                continue;
+
+            tracked_method = rt->tracked_methods[i].name;
+            del_tracked_method(rt, i);
+            break;
+        }
+
+        if (!tracked_method) {
+            /* Ignore this reply when the current method is not tracked. */
+            return 0;
+        }
+
+        if (!memcmp(tracked_method, "connect", 7)) {
+            av_free(tracked_method);
+
+            if (!rt->is_input) {
+                if ((ret = gen_release_stream(s, rt)) < 0)
                     return ret;
 
-                if (rt->is_input) {
-                    /* Send the FCSubscribe command when the name of live
-                     * stream is defined by the user or if it's a live stream. 
*/
-                    if (rt->subscribe) {
-                        if ((ret = gen_fcsubscribe_stream(s, rt,
-                                                          rt->subscribe)) < 0)
-                            return ret;
-                    } else if (rt->live == -1) {
-                        if ((ret = gen_fcsubscribe_stream(s, rt,
-                                                          rt->playpath)) < 0)
-                            return ret;
-                    }
-                }
-                break;
-            case STATE_FCPUBLISH:
-                rt->state = STATE_CONNECTING;
-                break;
-            case STATE_RELEASING:
-                rt->state = STATE_FCPUBLISH;
-                /* hack for Wowza Media Server, it does not send result for
-                 * releaseStream and FCPublish calls */
-                if (!pkt->data[10]) {
-                    int pkt_id = av_int2double(AV_RB64(pkt->data + 11));
-                    if (pkt_id == rt->create_stream_invoke)
-                        rt->state = STATE_CONNECTING;
-                }
-                if (rt->state != STATE_CONNECTING)
-                    break;
-            case STATE_CONNECTING:
-                //extract a number from the result
-                if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
-                    av_log(s, AV_LOG_WARNING, "Unexpected reply on 
connect()\n");
-                } else {
-                    rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 
21));
-                }
-                if (rt->is_input) {
-                    if ((ret = gen_play(s, rt)) < 0)
-                        return ret;
-                    if ((ret = gen_buffer_time(s, rt)) < 0)
+                if ((ret = gen_fcpublish_stream(s, rt)) < 0)
+                    return ret;
+            } else {
+                if ((ret = gen_server_bw(s, rt)) < 0)
+                    return ret;
+            }
+
+            if ((ret = gen_create_stream(s, rt)) < 0)
+                return ret;
+
+            if (rt->is_input) {
+                /* Send the FCSubscribe command when the name of live
+                 * stream is defined by the user or if it's a live stream. */
+                if (rt->subscribe) {
+                    if ((ret = gen_fcsubscribe_stream(s, rt,
+                                                      rt->subscribe)) < 0)
                         return ret;
-                } else {
-                    if ((ret = gen_publish(s, rt)) < 0)
+                } else if (rt->live == -1) {
+                    if ((ret = gen_fcsubscribe_stream(s, rt,
+                                                      rt->playpath)) < 0)
                         return ret;
                 }
-                rt->state = STATE_READY;
-                break;
+            }
+        } else if (!memcmp(tracked_method, "createStream", 12)) {
+            av_free(tracked_method);
+
+            //extract a number from the result
+            if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
+                av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
+            } else {
+                rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
+            }
+
+            if (!rt->is_input) {
+                if ((ret = gen_publish(s, rt)) < 0)
+                    return ret;
+            } else {
+                if ((ret = gen_play(s, rt)) < 0)
+                    return ret;
+                if ((ret = gen_buffer_time(s, rt)) < 0)
+                    return ret;
+            }
         }
     } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) {
         const uint8_t* ptr = pkt->data + 11;
@@ -1283,6 +1305,7 @@ static int rtmp_close(URLContext *h)
     if (rt->state > STATE_HANDSHAKED)
         ret = gen_delete_stream(h, rt);
 
+    free_tracked_methods(rt);
     av_freep(&rt->flv_data);
     ffurl_close(rt->stream);
     return ret;
@@ -1570,10 +1593,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, 
int size)
         if (rt->flv_off == rt->flv_size) {
             rt->skip_bytes = 4;
 
-            if ((ret = ff_rtmp_packet_write(rt->stream, &rt->out_pkt,
-                                            rt->chunk_size, rt->prev_pkt[1])) 
< 0)
+            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
                 return ret;
-            ff_rtmp_packet_destroy(&rt->out_pkt);
             rt->flv_size = 0;
             rt->flv_off = 0;
             rt->flv_header_bytes = 0;
-- 
1.7.11.1

_______________________________________________
libav-devel mailing list
libav-devel@libav.org
https://lists.libav.org/mailman/listinfo/libav-devel

Reply via email to