--- publisher.c | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ publisher.h | 134 +++++++++++++++++++++++++++++ 2 files changed, 412 insertions(+) create mode 100644 publisher.c create mode 100644 publisher.h
diff --git a/publisher.c b/publisher.c new file mode 100644 index 0000000..d1ccb95 --- /dev/null +++ b/publisher.c @@ -0,0 +1,278 @@ +#include "publisher.h" +#include "segment.h" +#include <libavutil/log.h> + +void client_log(struct Client *c) +{ + char state[64]; + sprintf("State: ", state); + switch(c->state) { + case FREE: + sprintf(state, "FREE"); + break; + case RESERVED: + sprintf(state, "RESERVED"); + break; + case WAIT: + sprintf(state, "WAIT"); + break; + case WRITABLE: + sprintf(state, "WRITABLE"); + break; + case BUSY: + sprintf(state, "BUSY"); + break; + case BUFFER_FULL: + sprintf(state, "BUFFER_FULL"); + break; + default: + sprintf(state, "UNDEFINED"); + break; + } + av_log(NULL, AV_LOG_INFO, "%s\n", state); +} + +void client_disconnect(struct Client *c) +{ + struct Segment *seg; + av_write_trailer(c->ofmt_ctx); + avio_close(c->ofmt_ctx->pb); + avformat_free_context(c->ofmt_ctx); + while(av_fifo_size(c->buffer)) { + av_fifo_generic_read(c->buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + client_set_state(c, FREE); + c->current_segment_id = -1; +} + +void client_set_state(struct Client *c, enum State state) +{ + pthread_mutex_lock(&c->state_lock); + c->state = state; + pthread_mutex_unlock(&c->state_lock); +} + +void client_push_segment(struct Client *c, struct Segment *seg) +{ + if (av_fifo_space(c->buffer) == 0) { + av_log(NULL, AV_LOG_WARNING, "Client buffer full, dropping Segment.\n"); + client_set_state(c, BUFFER_FULL); + return; + } + segment_ref(seg); + av_fifo_generic_write(c->buffer, &seg, sizeof(struct Segment*), NULL); + client_set_state(c, WRITABLE); +} + +void publisher_init(struct PublisherContext **pub) +{ + int i; + struct PublisherContext *pc = (struct PublisherContext*) malloc(sizeof(struct PublisherContext)); + pc->nb_threads = 4; + pc->current_segment_id = -1; + pc->shutdown = 0; + pc->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + pc->fs_buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + for (i = 0; i < MAX_CLIENTS; i++) { + struct Client *c = &pc->clients[i]; + c->buffer = av_fifo_alloc_array(sizeof(struct Segment), MAX_SEGMENTS); + c->id = i; + c->current_segment_id = -1; + pthread_mutex_init(&c->state_lock, NULL); + client_set_state(c, FREE); + } + *pub = pc; +} + +void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg) +{ + struct Segment *drop; + av_fifo_generic_write(pub->buffer, &seg, sizeof(struct Segment*), NULL); + segment_ref(seg); + if (av_fifo_size(pub->fs_buffer) >= BUFFER_SEGMENTS * sizeof(struct Segment*)) { + av_fifo_generic_read(pub->fs_buffer, &drop, sizeof(struct Segment*), NULL); + segment_unref(drop); + } + av_fifo_generic_write(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL); + segment_ref(seg); +} + +int publisher_reserve_client(struct PublisherContext *pub) +{ + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case FREE: + client_set_state(&pub->clients[i], RESERVED); + return 0; + default: + continue; + } + } + return 1; +} + +void publisher_cancel_reserve(struct PublisherContext *pub) +{ + int i; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case RESERVED: + client_set_state(&pub->clients[i], FREE); + return; + default: + continue; + } + } + return; +} + +void client_push_prebuffer(struct PublisherContext *pub, struct Client *c) +{ + int off; + int size; + struct Segment *seg; + size = av_fifo_size(pub->fs_buffer); + for (off = 0; off < size; off += sizeof(struct Segment*)) { + av_fifo_generic_peek_at(pub->fs_buffer, &seg, off, sizeof(struct Segment*), NULL); + client_push_segment(c, seg); + } +} + +void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx) +{ + int i; + struct Segment *prebuffer_seg; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case RESERVED: + pub->clients[i].ofmt_ctx = ofmt_ctx; + client_set_state(&pub->clients[i], WRITABLE); + client_push_prebuffer(pub, &pub->clients[i]); + return; + default: + continue; + } + } +} + +void publisher_free(struct PublisherContext *pub) +{ + int i; + struct Segment *seg; + while(av_fifo_size(pub->buffer)) { + av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + av_fifo_freep(&pub->buffer); + while(av_fifo_size(pub->fs_buffer)) { + av_fifo_generic_read(pub->fs_buffer, &seg, sizeof(struct Segment*), NULL); + segment_unref(seg); + } + av_fifo_freep(&pub->fs_buffer); + for (i = 0; i < MAX_CLIENTS; i++) { + av_fifo_freep(&pub->clients[i].buffer); + } + return; +} + +void publisher_freep(struct PublisherContext **pub) +{ + publisher_free(*pub); + *pub = NULL; + return; +} + +void publish(struct PublisherContext *pub) +{ + int i; + struct Segment *seg; + av_log(NULL, AV_LOG_DEBUG, "pub->buffer size: %d\n", av_fifo_size(pub->buffer)); + if (av_fifo_size(pub->buffer) == 0) + return; + av_log(NULL, AV_LOG_DEBUG, "Peeking buffer\n"); + av_fifo_generic_read(pub->buffer, &seg, sizeof(struct Segment*), NULL); + av_log(NULL, AV_LOG_DEBUG, "Peeked buffer\n"); + if (seg) { + pub->current_segment_id = seg->id; + for (i = 0; i < MAX_CLIENTS; i++) { + switch(pub->clients[i].state) { + case BUFFER_FULL: + av_log(NULL, AV_LOG_WARNING, "Dropping segment for client %d, buffer full.\n", i); + continue; + case WAIT: + case WRITABLE: + client_push_segment(&pub->clients[i], seg); + default: + continue; + } + } + segment_unref(seg); + } +} + +void publisher_gen_status_json(struct PublisherContext *pub, char *status) +{ + int nb_free = 0, nb_reserved = 0, nb_wait = 0, nb_writable = 0, nb_busy = 0, nb_buffer_full = 0, current_read = 0, newest_write = 0, oldest_write = 0; + int i; + struct Client *c; + + current_read = pub->current_segment_id; + oldest_write = current_read; + + for (i = 0; i < MAX_CLIENTS; i++) { + c = &pub->clients[i]; + if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) { + oldest_write = c->current_segment_id; + } + if (c->current_segment_id > newest_write) { + newest_write = c->current_segment_id; + } + + switch(c->state) { + case FREE: + nb_free++; + continue; + case RESERVED: + nb_reserved++; + continue; + case WAIT: + nb_wait++; + continue; + case WRITABLE: + nb_writable++; + continue; + case BUSY: + nb_busy++; + continue; + case BUFFER_FULL: + nb_buffer_full++; + continue; + default: + continue; + } + } + + + snprintf(status, 4095, + "{\n\t\"free\": %d,\n" + "\t\"reserved\": %d,\n" + "\t\"wait\": %d,\n" + "\t\"writable\": %d,\n" + "\t\"busy\": %d\n" + "\t\"buffer_full\": %d\n" + "\t\"current_read\": %d\n" + "\t\"newest_write\": %d\n" + "\t\"oldest_write\": %d\n" + "}\n", + nb_free, + nb_reserved, + nb_wait, + nb_writable, + nb_busy, + nb_buffer_full, + current_read, + newest_write, + oldest_write); +} diff --git a/publisher.h b/publisher.h new file mode 100644 index 0000000..7646fda --- /dev/null +++ b/publisher.h @@ -0,0 +1,134 @@ +#ifndef PUBLISHER_H +#define PUBLISHER_H + +#include <libavformat/avformat.h> +#include <libavutil/fifo.h> +#include <pthread.h> +#include "segment.h" + +#define MAX_CLIENTS 16 +#define MAX_SEGMENTS 16 +#define BUFFER_SEGMENTS 10 + +/* Client State enum */ + +enum State { + FREE, // no client connected + RESERVED, // reserved for a client that just connected + WAIT, // up to date, no new Segments to write + WRITABLE, // buffer is not full, new Segments can be pushed + BUSY, // currently writing to this client + BUFFER_FULL // client buffer is full, new Segments will be dropped +}; + + +struct Client { + AVFormatContext *ofmt_ctx; // writable AVFormatContext, basically our tcp connection to the client + AVFifoBuffer *buffer; // Client buffer of Segment references + enum State state; + pthread_mutex_t state_lock; + int id; + int current_segment_id; // The stream-based id of the segment that has last been worked on. +}; + +struct PublisherContext { + struct Client clients[MAX_CLIENTS]; // currently compile-time configuration, easly made dynamic with malloc? + AVFifoBuffer *buffer; // publisher buffer for new Segments + AVFifoBuffer *fs_buffer; // fast start buffer + int nb_threads; + int current_segment_id; + int shutdown; // indicate shutdown, gracefully close client connections and files and exit +}; + +/** + * Log a client's stats to the console. + * + * @param c pointer to the client to print + */ +void client_log(struct Client *c); + +/** + * Disconnect a client + * + * @param c pointer to the client to disconnect. + */ +void client_disconnect(struct Client *c); + +/** + * Set a client's state. Note: This is protected by mutex locks. + * + * @param c pointer to the client to set the state of + * @param state the state to set the client to + */ +void client_set_state(struct Client *c, enum State state); + +/** + * Allocate and initialize a PublisherContext + * + * @param pub pointer to a pointer to a PublisherContext. It will be allocated and initialized. + */ +void publisher_init(struct PublisherContext **pub); + +/** + * Push a Segment to a PublisherContext. + * + * @param pub pointer to a PublisherContext + * @param seg pointer to the Segment to add + */ +void publisher_push_segment(struct PublisherContext *pub, struct Segment *seg); + +/** + * Reserve a slot in the client struct of a PublisherContext. May fail if the number + * of maximum clients has been reached. + * + * @param pub pointer to a PublisherContext + * @return 0 in case of success, 1 in case of failure + */ +int publisher_reserve_client(struct PublisherContext *pub); + +/** + * Cancel a single reservation. This can be used if a client spot was reserved, but the client + * unexpectedly disconnects or sends an invalid request. + * + * @param pub pointer to a PublisherContext + */ +void publisher_cancel_reserve(struct PublisherContext *pub); + +/** + * Add a client by its ofmt_ctx. This initializes an element in the client struct of the PublisherContext + * that has been reserved prior to calling this function. + * + * @param pub pointer to a PublisherContext + * @param ofmt_ctx AVFormatContext of a client + */ +void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx); + +/** + * Free buffers and associated client buffers. + * + * @param pub pointer to the PublisherContext to free + */ +void publisher_free(struct PublisherContext *pub); + +/** + * Free buffers and associated client buffers and set *pub to NULL. + * + * @param pub pointer to the PublisherContext pointer to free + */ +void publisher_freep(struct PublisherContext **pub); + +/** + * Signal to the PublisherContext to check its buffer and publish pending Segments. + * + * @param pub pointer to a PublisherContext + */ +void publish(struct PublisherContext *pub); + +/** + * Print the current client and file reading status to a json string. + * @param pub pointer to a PublisherContext + * @param status string of at least 4096 bytes size. + */ +void publisher_gen_status_json(struct PublisherContext *pub, char *status); + +#endif // PUBLISHER_H -- 2.16.2 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel