This code allows receiving a stream from a client with the PUBLISH command.
---
libavformat/rtmpproto.c | 470 ++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 449 insertions(+), 21 deletions(-)
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index 5342be8..e901b7e 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -29,6 +29,7 @@
#include "libavutil/intfloat.h"
#include "libavutil/lfg.h"
#include "libavutil/opt.h"
+#include "libavutil/random_seed.h"
#include "libavutil/sha.h"
#include "avformat.h"
#include "internal.h"
@@ -92,6 +93,8 @@ typedef struct RTMPContext {
int server_bw; ///< server bandwidth
int client_buffer_time; ///< client buffer time in ms
int flush_interval; ///< number of packets flushed
in the same request (RTMPT only)
+ int listen; ///< listen mode flag
+ int listen_timeout; ///< listen timeout to wait for
new connections
} RTMPContext;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for
first client digest signing
@@ -267,6 +270,115 @@ static int gen_connect(URLContext *s, RTMPContext *rt)
return ret;
}
+static int read_connect(URLContext *s, RTMPContext *rt)
+{
+ RTMPPacket pkt = { 0 };
+ uint8_t *p;
+ int ret;
+ if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1])) < 0)
+ return ret;
+ if (!strcmp(pkt.data, "connect"))
+ return AVERROR_INVALIDDATA;
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Send Window Acknowledgement Size (wireshark)
+ pkt.data = NULL;
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_SERVER_BW, 0, 4096)) < 0)
+ return ret;
+ p = pkt.data;
+ bytestream_put_be32(&p, rt->server_bw);
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Send Peer Bandwidth
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_CLIENT_BW, 0, 4096)) < 0)
+ return ret;
+ p = pkt.data;
+ bytestream_put_be32(&p, 268435455);
+ bytestream_put_byte(&p, 2); // dynamic
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Ping request
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
+ RTMP_PT_PING, 0, 4096)) < 0)
+ return ret;
+
+ p = pkt.data;
+ pkt.data_size = 6;
+ bytestream_put_be16(&p, 0); // 0 -> Stream Begin
+ bytestream_put_be32(&p, 0);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Chunk size
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_CHUNK_SIZE, 0, 4096)) < 0)
+ return ret;
+
+ p = pkt.data;
+ pkt.data_size = 4;
+ bytestream_put_be32(&p, rt->chunk_size);
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ // Send result_
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0, 8192)) < 0)
+ return ret;
+
+ p = pkt.data;
+ ff_amf_write_string(&p, "_result");
+ ff_amf_write_number(&p, 1);
+
+ ff_amf_write_object_start(&p);
+ ff_amf_write_field_name(&p, "fmsVer");
+ ff_amf_write_string(&p, "FMS/3,0,1,123");
+ ff_amf_write_field_name(&p, "capabilities");
+ ff_amf_write_number(&p, 31);
+ ff_amf_write_object_end(&p);
+
+ ff_amf_write_object_start(&p);
+ ff_amf_write_field_name(&p, "level");
+ ff_amf_write_string(&p, "status");
+ ff_amf_write_field_name(&p, "code");
+ ff_amf_write_string(&p, "NetConnection.Connect.Success");
+ ff_amf_write_field_name(&p, "description");
+ ff_amf_write_string(&p, "Connection succeeded.");
+ ff_amf_write_field_name(&p, "objectEncoding");
+ ff_amf_write_number(&p, 0);
+ ff_amf_write_object_end(&p);
+
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0, 4096)) < 0)
+ return ret;
+ p = pkt.data;
+ ff_amf_write_string(&p, "onBWDone");
+ ff_amf_write_number(&p, 0);
+ ff_amf_write_null(&p);
+ ff_amf_write_number(&p, 8192);
+ pkt.data_size = p - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+
+ return ret;
+}
+
/**
* Generate 'releaseStream' call and send it to the server. It should make
* the server release some channel for media streams.
@@ -814,6 +926,111 @@ static int rtmp_handshake(URLContext *s, RTMPContext *rt)
}
/**
+ * rtmp handshake server side
+ */
+static int rtmp_listen_handshake(URLContext *s, RTMPContext *rt)
+{
+ uint8_t buffer[RTMP_HANDSHAKE_PACKET_SIZE];
+ uint32_t epoch; // to context?
+ uint32_t my_epoch; // to context?
+ uint32_t temp;
+ uint8_t peer_random[RTMP_HANDSHAKE_PACKET_SIZE - 8]; // to context?
+ uint8_t my_random[RTMP_HANDSHAKE_PACKET_SIZE - 8]; // to context?
+ int randomidx = 0;
+ ssize_t inoutsize = 0;
+
+ inoutsize = ffurl_read(rt->stream, buffer, 1); // Receive C0
+ if (inoutsize == 0) return AVERROR(EIO);
+ if (buffer[0] == 3) /* Check Version */
+ if (!ffurl_write(rt->stream, buffer, 1)) { // Send S0
+ av_log(NULL, AV_LOG_ERROR,
+ "Unable to write answer - RTMP S0\n");
+ return AVERROR_PROTOCOL_NOT_FOUND;
+ }
+ /* Receive C1 */
+ inoutsize = ffurl_read(rt->stream, buffer, 4096);
+ if (inoutsize == 0)
+ return AVERROR(EIO);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(NULL, AV_LOG_ERROR, "Erroneous C1 Message size %d"
+ " not following standard\n", (int)inoutsize);
+ return AVERROR_INVALIDDATA;
+ }
+ memcpy(&temp, buffer, 4);
+ epoch = ntohl(temp);
+ my_epoch = epoch;
+ memcpy(&temp, buffer + 4, 4);
+ temp = ntohl(temp);
+ if (temp != 0) {
+ av_log(NULL, AV_LOG_WARNING,
+ "Erroneous C1 Message zero != 0 -> %d\n", temp);
+ }
+ memcpy(peer_random, buffer + 8,
+ RTMP_HANDSHAKE_PACKET_SIZE - 8);
+ /* Send S1 */
+ /* By now same epoch will be send */
+ for (randomidx = 0;
+ randomidx < (RTMP_HANDSHAKE_PACKET_SIZE - 8);
+ randomidx += 4) {
+ temp = av_get_random_seed();
+ memcpy(my_random + randomidx, &temp, 4);
+ }
+ memcpy(buffer + 8, my_random, RTMP_HANDSHAKE_PACKET_SIZE - 8);
+ inoutsize = ffurl_write(rt->stream, buffer,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Unable to write answer - RTMP S1\n");
+ return AVERROR_INVALIDDATA;
+ }
+
+ /* Send S2 */
+ temp = htonl(epoch);
+ memcpy(buffer, &temp , 4);
+ // TODO: Time2 missing, by now 0
+ temp = 0;
+ memcpy(buffer + 4, &temp, 4);
+ memcpy(buffer + 8, peer_random,
+ RTMP_HANDSHAKE_PACKET_SIZE - 8);
+ inoutsize = ffurl_write(rt->stream, buffer,
+ RTMP_HANDSHAKE_PACKET_SIZE);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Unable to write answer - RTMP S2\n");
+ return AVERROR_INVALIDDATA;
+ }
+
+ /* Receive C2 */
+ inoutsize = ffurl_read(rt->stream, buffer, 4096);
+ if (inoutsize == 0)
+ return AVERROR(EIO);
+ if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
+ av_log(NULL, AV_LOG_ERROR, "Erroneous C2 Message size %d"
+ " not following standard\n", (int)inoutsize);
+ return AVERROR_INVALIDDATA;
+ }
+ memcpy(&temp, buffer, 4);
+ temp = ntohl(temp);
+ if (temp != my_epoch) {
+ av_log(NULL, AV_LOG_ERROR, "Erroneous C2 Message epoch"
+ " does not match up with C1 epoch\n");
+ return AVERROR_INVALIDDATA;
+ }
+
+ if (memcmp(buffer + 8, my_random,
+ RTMP_HANDSHAKE_PACKET_SIZE - 8) != 0) {
+ av_log(NULL, AV_LOG_ERROR, "Erroneous C2 Message random"
+ " does not match up\n");
+ return AVERROR_INVALIDDATA;
+ }
+
+ /* Handshake successful */
+ return 0;
+
+}
+
+
+/**
* Parse received packet and possibly perform some action depending on
* the packet contents.
* @return 0 for no errors, negative values for serious errors which prevent
@@ -1075,6 +1292,186 @@ static int get_packet(URLContext *s, int for_header)
}
}
+
+static int get_listen_packet(URLContext *s, int for_header)
+{
+ RTMPContext *rt = s->priv_data;
+ int ret = 0;
+ const uint8_t *p = NULL;
+ uint32_t chunksize = 0;
+ uint8_t commandbuffer[64];
+ char filename[64];
+ char statusmsg[128];
+ double doubleval = 0;
+ char *pchar = NULL;
+ uint32_t numitems = 0;
+ uint16_t propertylen = 0;
+ char property[64];
+ double propertyvalue = 0;
+ double seqnum = 0;
+
+ if (rt->state == STATE_STOPPED)
+ return AVERROR_EOF;
+
+ for (;;) {
+ RTMPPacket rpkt = { 0 };
+ if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
+ rt->chunk_size,
+ rt->prev_pkt[0])) <= 0) {
+ if (ret == 0) {
+ return AVERROR(EAGAIN);
+ } else {
+ return AVERROR(EIO);
+ }
+ }
+ p = rpkt.data;
+ switch(rpkt.type) {
+ case RTMP_PT_CHUNK_SIZE:
+ chunksize = bytestream_get_be32(&p);
+ if (chunksize != rt->chunk_size) {
+ av_log(s, AV_LOG_ERROR, "Invalid chunk size received\n");
+ return AVERROR_INVALIDDATA;
+ }
+ break;
+ case RTMP_PT_NOTIFY:
+ if (ff_amf_read_string(p, commandbuffer, sizeof(commandbuffer)))
+ return AVERROR_BUG;
+ if (!strcmp(commandbuffer, "@setDataFrame")) {
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ if (ff_amf_read_string(p, statusmsg, sizeof(statusmsg)))
+ return AVERROR_BUG;
+ if (strcmp(statusmsg, "onMetaData"))
+ return AVERROR_INVALIDDATA;
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ /* Parse ECMAArray */
+ if (p[0] != AMF_DATA_TYPE_MIXEDARRAY)
+ return AVERROR_INVALIDDATA;
+ p++;
+ numitems = bytestream_get_be32(&p);
+ while (numitems > 0) {
+ propertylen = bytestream_get_be16(&p);
+ if (propertylen + 1 > sizeof(property))
+ return AVERROR(EINVAL);
+ memcpy(property, p, propertylen);
+ property[propertylen] = '\0';
+ p += propertylen;
+ if (strcmp(property, "encoder")) {
+ if (ff_amf_read_number(p, &propertyvalue))
+ return AVERROR_BUG;
+ av_dlog(s, "ECMA %s %g\n", property,
+ propertyvalue);
+ } else {
+ av_dlog(s, "ECMA %s Ignored\n", property);
+ }
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ numitems--;
+ }
+ return 0;
+ } else
+ return AVERROR_INVALIDDATA;
+
+ break;
+ case RTMP_PT_INVOKE:
+ if (ff_amf_read_string(p, commandbuffer, sizeof(commandbuffer)))
+ return AVERROR_BUG;
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ ret = ff_amf_read_number(p, &doubleval);
+ if (ret)
+ return ret;
+ seqnum = doubleval;
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ ret = ff_amf_read_null(p);
+ if (ret)
+ return ret;
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ if (!strcmp(commandbuffer, "releaseStream") ||
+ !strcmp(commandbuffer, "FCPublish") ||
+ !strcmp(commandbuffer, "publish")) {
+ RTMPPacket pkt = { 0 };
+ uint8_t * pp;
+ ret = ff_amf_read_string(p, filename, sizeof(filename));
+ if (ret)
+ return ret;
+ p += ff_amf_tag_size(p, rpkt.data + rpkt.data_size);
+ // check with url
+ if (s->filename) {
+ pchar = strrchr(s->filename, '/');
+ pchar++;
+ if (strcmp(pchar, filename))
+ av_log(s, AV_LOG_WARNING, "Unexpected stream %s, expecting"
+ " %s\n", filename, pchar);
+ }
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ 4096)) < 0)
+ return ret;
+ pp = pkt.data;
+ if (!strcmp(commandbuffer, "FCPublish"))
+ ff_amf_write_string(&pp, "onFCPublish");
+ else if (!strcmp(commandbuffer, "publish")) {
+ ff_amf_write_string(&pp, "onStatus");
+ ff_amf_write_number(&pp, 0);
+ ff_amf_write_null(&pp);
+
+ ff_amf_write_object_start(&pp);
+ ff_amf_write_field_name(&pp, "level");
+ ff_amf_write_string(&pp, "status");
+ ff_amf_write_field_name(&pp, "code");
+ ff_amf_write_string(&pp, "NetStream.Publish.Start");
+ ff_amf_write_field_name(&pp, "description");
+ snprintf(statusmsg, sizeof(statusmsg),
+ "%s is now published", filename);
+ ff_amf_write_string(&pp, statusmsg);
+ ff_amf_write_field_name(&pp, "details");
+ ff_amf_write_string(&pp, filename);
+ ff_amf_write_field_name(&pp, "clientid");
+ snprintf(statusmsg, sizeof(statusmsg), "LibAVFormat %d",
+ LIBAVFORMAT_VERSION_INT);
+ ff_amf_write_string(&pp, statusmsg);
+ ff_amf_write_object_end(&pp);
+
+ } else {
+ ff_amf_write_string(&pp, "_result");
+ ff_amf_write_number(&pp, seqnum);
+ ff_amf_write_null(&pp);
+ }
+ pkt.data_size = pp - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ } else if (!strcmp(commandbuffer, "_checkbw") ||
+ !strcmp(commandbuffer, "createStream")) {
+ RTMPPacket pkt = { 0 };
+ uint8_t * pp;
+ if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
+ RTMP_PT_INVOKE, 0,
+ 4096)) < 0)
+ return ret;
+ pp = pkt.data;
+ ff_amf_write_string(&pp, "_result");
+ ff_amf_write_number(&pp, seqnum);
+ ff_amf_write_null(&pp);
+ pkt.data_size = pp - pkt.data;
+ ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->chunk_size,
+ rt->prev_pkt[1]);
+ ff_rtmp_packet_destroy(&pkt);
+ } else {
+ av_log(s, AV_LOG_ERROR,
+ "Unexpected command in PT_INVOKE %s\n", commandbuffer);
+ return AVERROR_INVALIDDATA;
+ }
+ break;
+ default:
+ av_log(s, AV_LOG_ERROR, "Unexpected rtmp packet type\n");
+ return AVERROR_INVALIDDATA;
+ break;
+ }
+ ff_rtmp_packet_destroy(&rpkt);
+ }
+ return 0;
+}
+
+
static int rtmp_close(URLContext *h)
{
RTMPContext *rt = h->priv_data;
@@ -1113,6 +1510,9 @@ static int rtmp_open(URLContext *s, const char *uri, int
flags)
int port;
int ret;
+ if (rt->listen_timeout > 0)
+ rt->listen = 1;
+
rt->is_input = !(flags & AVIO_FLAG_WRITE);
av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
&port,
@@ -1125,7 +1525,12 @@ static int rtmp_open(URLContext *s, const char *uri, int
flags)
/* open the tcp connection */
if (port < 0)
port = RTMP_DEFAULT_PORT;
- ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
+ if (rt->listen)
+ ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port,
+ "?listen&listen_timeout=%d",
+ rt->listen_timeout * 1000);
+ else
+ ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
}
if ((ret = ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
@@ -1135,9 +1540,10 @@ static int rtmp_open(URLContext *s, const char *uri, int
flags)
}
rt->state = STATE_START;
- if ((ret = rtmp_handshake(s, rt)) < 0)
+ if (!rt->listen && (ret = rtmp_handshake(s, rt)) < 0)
+ goto fail;
+ if (rt->listen && (ret = rtmp_listen_handshake(s, rt)) < 0)
goto fail;
-
rt->chunk_size = 128;
rt->state = STATE_HANDSHAKED;
@@ -1234,32 +1640,52 @@ static int rtmp_open(URLContext *s, const char *uri,
int flags)
av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
proto, path, rt->app, rt->playpath);
- if ((ret = gen_connect(s, rt)) < 0)
- goto fail;
+ if (!rt->listen) {
+ if ((ret = gen_connect(s, rt)) < 0)
+ goto fail;
- do {
- ret = get_packet(s, 1);
- } while (ret == EAGAIN);
- if (ret < 0)
- goto fail;
+ do {
+ ret = get_packet(s, 1);
+ } while (ret == EAGAIN);
+ if (ret < 0)
+ goto fail;
- if (rt->is_input) {
- // generate FLV header for demuxer
+ if (rt->is_input) {
+ // generate FLV header for demuxer
+ rt->flv_size = 13;
+ rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
+ rt->flv_off = 0;
+ memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
+ } else {
+ rt->flv_size = 0;
+ rt->flv_data = NULL;
+ rt->flv_off = 0;
+ rt->skip_bytes = 13;
+ }
+
+ s->max_packet_size = rt->stream->max_packet_size;
+ s->is_streamed = 1;
+ return 0;
+ } else {
+ if (read_connect(s, s->priv_data) < 0)
+ goto fail;
+ do {
+ ret = get_listen_packet(s, 1);
+ } while (ret == EAGAIN);
+ if (ret < 0)
+ goto fail;
+
+ rt->is_input = 1;
rt->flv_size = 13;
rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
rt->flv_off = 0;
memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
- } else {
- rt->flv_size = 0;
- rt->flv_data = NULL;
- rt->flv_off = 0;
- rt->skip_bytes = 13;
+ s->max_packet_size = rt->stream->max_packet_size;
+ s->is_streamed = 1;
+ rt->state = STATE_PLAYING;
+ return 0;
}
- s->max_packet_size = rt->stream->max_packet_size;
- s->is_streamed = 1;
- return 0;
-
fail:
rtmp_close(s);
return ret;
@@ -1423,6 +1849,8 @@ static const AVOption rtmp_options[] = {
{"rtmp_playpath", "Stream identifier to play or to publish",
OFFSET(playpath), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_swfurl", "URL of the SWF player. By default no value will be sent",
OFFSET(swfurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_tcurl", "URL of the target stream. Defaults to
rtmp://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0,
0, DEC|ENC},
+ {"rtmp_listen", "Listen for incomming rtmp connections", OFFSET(listen),
AV_OPT_TYPE_INT, {0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
+ {"timeout", "Maximum timeout (in seconds) to wait for incoming
connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout),
AV_OPT_TYPE_INT, {-1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{ NULL },
};
--
1.7.10
_______________________________________________
libav-devel mailing list
[email protected]
https://lists.libav.org/mailman/listinfo/libav-devel