--- libavformat/rtmpproto.c | 278 ++++++++++++++++++++++++++++-------------------- 1 file changed, 161 insertions(+), 117 deletions(-)
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c index 4a35319..6044425 100644 --- a/libavformat/rtmpproto.c +++ b/libavformat/rtmpproto.c @@ -52,15 +52,17 @@ typedef enum { STATE_START, ///< client has not done anything yet STATE_HANDSHAKED, ///< client has performed handshake - STATE_RELEASING, ///< client releasing stream before publish it (for output) STATE_FCPUBLISH, ///< client FCPublishing stream (for output) - STATE_CONNECTING, ///< client connected to server successfully - STATE_READY, ///< client has sent all needed commands and waits for server reply STATE_PLAYING, ///< client has started receiving multimedia data from server STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output) STATE_STOPPED, ///< the broadcast has been stopped } ClientState; +typedef struct TrackedMethod { + char *name; + int id; +} TrackedMethod; + /** protocol handler context */ typedef struct RTMPContext { const AVClass *class; @@ -86,7 +88,6 @@ typedef struct RTMPContext { uint8_t flv_header[11]; ///< partial incoming flv packet header int flv_header_bytes; ///< number of initialized bytes in flv_header int nb_invokes; ///< keeps track of invoke messages - int create_stream_invoke; ///< invoke id for the create stream command char* tcurl; ///< url of the target stream char* flashver; ///< version of the flash plugin char* swfurl; ///< url of the swf player @@ -95,6 +96,9 @@ typedef struct RTMPContext { int client_buffer_time; ///< client buffer time in ms int flush_interval; ///< number of packets flushed in the same request (RTMPT only) int encrypted; ///< use an encrypted connection (RTMPE only) + TrackedMethod*tracked_methods; ///< tracked methods buffer + int nb_tracked_methods; ///< number of tracked methods + int tracked_methods_size; ///< size of the tracked methods buffer } RTMPContext; #define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing @@ -120,6 +124,103 @@ static const uint8_t rtmp_server_key[] = { 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE }; +static int ff_amf_read_string(GetByteContext *bsc, uint8_t *str, + int strsize, int *strlen) +{ + int stringlen = 0; + int readsize; + if (bytestream2_get_byte(bsc) != AMF_DATA_TYPE_STRING) + return AVERROR_INVALIDDATA; + stringlen = bytestream2_get_be16(bsc); + if (stringlen + 1 > strsize) + return AVERROR(EINVAL); + readsize = bytestream2_get_buffer(bsc, str, stringlen); + if (readsize != stringlen) { + av_log(NULL, AV_LOG_WARNING, + "ff_amf_read_string length differs from announced length\n"); + } + str[readsize] = '\0'; + *strlen = FFMIN(stringlen, readsize); + return 0; +} + +static int ff_amf_read_number(GetByteContext *bsc, double *val) +{ + uint64_t read; + if (bytestream2_get_byte(bsc) != AMF_DATA_TYPE_NUMBER) + return AVERROR_INVALIDDATA; + read = bytestream2_get_be64(bsc); + *val = av_int2double(read); + return 0; +} + +static int add_tracked_method(RTMPContext *rt, const char *name, int id) +{ + void *ptr; + + if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) { + rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2; + ptr = av_realloc(rt->tracked_methods, + rt->tracked_methods_size * sizeof(*rt->tracked_methods)); + if (!ptr) + return AVERROR(ENOMEM); + rt->tracked_methods = ptr; + } + + rt->tracked_methods[rt->nb_tracked_methods].name = strdup(name); + rt->tracked_methods[rt->nb_tracked_methods++].id = id; + + return 0; +} + +static int find_tracked_method(RTMPContext *rt, int id, char **buf) +{ + int i; + + for (i = 0; i < rt->nb_tracked_methods; i++) { + if (rt->tracked_methods[i].id != id) + continue; + + *buf = rt->tracked_methods[i].name; + return 0; + } + + return 1; +} + +static void free_tracked_methods(RTMPContext *rt) +{ + int i; + + for (i = 0; i < rt->nb_tracked_methods; i ++) + av_free(rt->tracked_methods[i].name); + av_free(rt->tracked_methods); +} + +static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track) +{ + int ret; + + if (pkt->type == RTMP_PT_INVOKE && track) { + GetByteContext gbc; + char name[128]; + int len; + + bytestream2_init(&gbc, pkt->data, pkt->data_size); + if ((ret = ff_amf_read_string(&gbc, name, sizeof(name), &len)) < 0) + return ret; + + if ((ret = add_tracked_method(rt, name, rt->nb_invokes)) < 0) + return ret; + } + + ret = ff_rtmp_packet_write(rt->stream, pkt, rt->chunk_size, + rt->prev_pkt[1]); + ff_rtmp_packet_destroy(pkt); + + return ret; +} + static int rtmp_write_amf_data(URLContext *s, char *param, uint8_t **p) { char *field, *value; @@ -268,11 +369,7 @@ static int gen_connect(URLContext *s, RTMPContext *rt) pkt.data_size = p - pkt.data; - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 1); } /** @@ -296,11 +393,7 @@ static int gen_release_stream(URLContext *s, RTMPContext *rt) ff_amf_write_null(&p); ff_amf_write_string(&p, rt->playpath); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -324,11 +417,7 @@ static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt) ff_amf_write_null(&p); ff_amf_write_string(&p, rt->playpath); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -352,11 +441,7 @@ static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt) ff_amf_write_null(&p); ff_amf_write_string(&p, rt->playpath); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -379,13 +464,8 @@ static int gen_create_stream(URLContext *s, RTMPContext *rt) ff_amf_write_string(&p, "createStream"); ff_amf_write_number(&p, ++rt->nb_invokes); ff_amf_write_null(&p); - rt->create_stream_invoke = rt->nb_invokes; - - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - return ret; + return rtmp_send_packet(rt, &pkt, 1); } @@ -411,11 +491,7 @@ static int gen_delete_stream(URLContext *s, RTMPContext *rt) ff_amf_write_null(&p); ff_amf_write_number(&p, rt->main_channel_id); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -436,11 +512,7 @@ static int gen_buffer_time(URLContext *s, RTMPContext *rt) bytestream_put_be32(&p, rt->main_channel_id); bytestream_put_be32(&p, rt->client_buffer_time); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -468,11 +540,7 @@ static int gen_play(URLContext *s, RTMPContext *rt) ff_amf_write_string(&p, rt->playpath); ff_amf_write_number(&p, rt->live); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 1); } /** @@ -499,11 +567,7 @@ static int gen_publish(URLContext *s, RTMPContext *rt) ff_amf_write_string(&p, rt->playpath); ff_amf_write_string(&p, "live"); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 1); } /** @@ -528,11 +592,8 @@ static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt) p = pkt.data; bytestream_put_be16(&p, 7); bytestream_put_be32(&p, AV_RB32(ppkt->data+2)); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -550,11 +611,8 @@ static int gen_server_bw(URLContext *s, RTMPContext *rt) p = pkt.data; bytestream_put_be32(&p, rt->server_bw); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -575,11 +633,7 @@ static int gen_check_bw(URLContext *s, RTMPContext *rt) ff_amf_write_number(&p, ++rt->nb_invokes); ff_amf_write_null(&p); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - - return ret; + return rtmp_send_packet(rt, &pkt, 0); } /** @@ -597,11 +651,8 @@ static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts) p = pkt.data; bytestream_put_be32(&p, rt->bytes_read); - ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size, - rt->prev_pkt[1]); - ff_rtmp_packet_destroy(&pkt); - return ret; + return rtmp_send_packet(rt, &pkt, 0); } int ff_rtmp_calc_digest(const uint8_t *src, int len, int gap, @@ -996,54 +1047,48 @@ static int handle_invoke(URLContext *s, RTMPPacket *pkt) av_log(s, AV_LOG_ERROR, "Server error: %s\n",tmpstr); return -1; } else if (!memcmp(pkt->data, "\002\000\007_result", 10)) { - switch (rt->state) { - case STATE_HANDSHAKED: - if (!rt->is_input) { - if ((ret = gen_release_stream(s, rt)) < 0) - return ret; - if ((ret = gen_fcpublish_stream(s, rt)) < 0) - return ret; - rt->state = STATE_RELEASING; - } else { - if ((ret = gen_server_bw(s, rt)) < 0) - return ret; - rt->state = STATE_CONNECTING; - } - if ((ret = gen_create_stream(s, rt)) < 0) + char *tracked_method = NULL; + GetByteContext gbc; + double pkt_id; + + bytestream2_init(&gbc, pkt->data + 10, pkt->data_size); + if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0) + return ret; + + if (find_tracked_method(rt, pkt_id, &tracked_method)) { + /* Ignore this reply when the current method is not tracked. */ + return 0; + } + + if (!memcmp(tracked_method, "connect", 7)) { + if (!rt->is_input) { + if ((ret = gen_release_stream(s, rt)) < 0) return ret; - break; - case STATE_FCPUBLISH: - rt->state = STATE_CONNECTING; - break; - case STATE_RELEASING: - rt->state = STATE_FCPUBLISH; - /* hack for Wowza Media Server, it does not send result for - * releaseStream and FCPublish calls */ - if (!pkt->data[10]) { - int pkt_id = av_int2double(AV_RB64(pkt->data + 11)); - if (pkt_id == rt->create_stream_invoke) - rt->state = STATE_CONNECTING; - } - if (rt->state != STATE_CONNECTING) - break; - case STATE_CONNECTING: - //extract a number from the result - if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { - av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n"); - } else { - rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21)); - } - if (rt->is_input) { - if ((ret = gen_play(s, rt)) < 0) - return ret; - if ((ret = gen_buffer_time(s, rt)) < 0) - return ret; - } else { - if ((ret = gen_publish(s, rt)) < 0) - return ret; - } - rt->state = STATE_READY; - break; + if ((ret = gen_fcpublish_stream(s, rt)) < 0) + return ret; + } else { + if ((ret = gen_server_bw(s, rt)) < 0) + return ret; + } + if ((ret = gen_create_stream(s, rt)) < 0) + return ret; + } else if (!memcmp(tracked_method, "createStream", 12)) { + //extract a number from the result + if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) { + av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n"); + } else { + rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21)); + } + + if (!rt->is_input) { + if ((ret = gen_publish(s, rt)) < 0) + return ret; + } else { + if ((ret = gen_play(s, rt)) < 0) + return ret; + if ((ret = gen_buffer_time(s, rt)) < 0) + return ret; + } } } else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) { const uint8_t* ptr = pkt->data + 11; @@ -1244,6 +1289,7 @@ static int rtmp_close(URLContext *h) if (rt->state > STATE_HANDSHAKED) ret = gen_delete_stream(h, rt); + free_tracked_methods(rt); av_freep(&rt->flv_data); ffurl_close(rt->stream); return ret; @@ -1531,10 +1577,8 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size) if (rt->flv_off == rt->flv_size) { rt->skip_bytes = 4; - if ((ret = ff_rtmp_packet_write(rt->stream, &rt->out_pkt, - rt->chunk_size, rt->prev_pkt[1])) < 0) + if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0) return ret; - ff_rtmp_packet_destroy(&rt->out_pkt); rt->flv_size = 0; rt->flv_off = 0; rt->flv_header_bytes = 0; -- 1.7.11.1 _______________________________________________ libav-devel mailing list libav-devel@libav.org https://lists.libav.org/mailman/listinfo/libav-devel