vlc | branch: master | Justin Kim <justin....@collabora.com> | Wed Mar 28 19:29:45 2018 +0900| [ba758d7fa8a8d1e0de6c15dda11fd7d38fb24d84] | committer: Thomas Guillem
access: srt: refactor to support connection recovery This is a refactoring code to support SRT connection recovery. While doing this work, SRT modules support new features comparing to the previous ones. - Connection Recovery When lost a SRT connection, this module can detect and try to re-connect. - Interruptible by SRT APIs 'srt_epoll_wait' will be interrupted when all socket descriptors are removed from its montoring list. Fortunately, SRT modules are using only one socket. - Platform Independent Now, SRT modules no longer require to use a pipe. Therfore, from this version, SRT modules can support Win32 platforms. Based on code from: Roman Diouskine <rdiousk...@haivision.com> Signed-off-by: Justin Kim <justin....@collabora.com> Signed-off-by: Thomas Guillem <tho...@gllm.fr> > http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=ba758d7fa8a8d1e0de6c15dda11fd7d38fb24d84 --- modules/access/Makefile.am | 1 + modules/access/srt.c | 341 +++++++++++++++++++++++++++------------------ 2 files changed, 203 insertions(+), 139 deletions(-) diff --git a/modules/access/Makefile.am b/modules/access/Makefile.am index d1578be151..6067e0f0d7 100644 --- a/modules/access/Makefile.am +++ b/modules/access/Makefile.am @@ -427,6 +427,7 @@ EXTRA_LTLIBRARIES += libaccess_mtp_plugin.la ### SRT ### libaccess_srt_plugin_la_SOURCES = access/srt.c +libaccess_srt_plugin_la_CFLAGS = $(AM_CFLAGS) $(SRT_CFLAGS) libaccess_srt_plugin_la_CPPFLAGS = $(AM_CPPFLAGS) $(SRT_CPPFLAGS) libaccess_srt_plugin_la_LIBADD = $(SRT_LIBS) libaccess_srt_plugin_la_LDFLAGS = $(AM_LDFLAGS) -rpath '$(accessdir)' diff --git a/modules/access/srt.c b/modules/access/srt.c index bf770a42a5..52a5bdf0f8 100644 --- a/modules/access/srt.c +++ b/modules/access/srt.c @@ -1,9 +1,11 @@ /***************************************************************************** * srt.c: SRT (Secure Reliable Transport) input module ***************************************************************************** - * Copyright (C) 2017, Collabora Ltd. + * Copyright (C) 2017-2018, Collabora Ltd. + * Copyright (C) 2018, Haivision Systems Inc. * * Authors: Justin Kim <justin....@collabora.com> + * Roman Diouskine <rdiousk...@haivision.com> * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by @@ -24,9 +26,6 @@ # include "config.h" #endif -#include <errno.h> -#include <fcntl.h> - #include <vlc_common.h> #include <vlc_interrupt.h> #include <vlc_fs.h> @@ -61,21 +60,27 @@ struct stream_sys_t { SRTSOCKET sock; int i_poll_id; - int i_poll_timeout; - int i_latency; - size_t i_chunk_size; - int i_pipe_fds[2]; + vlc_mutex_t lock; + bool b_interrupted; }; static void srt_wait_interrupted(void *p_data) { stream_t *p_stream = p_data; stream_sys_t *p_sys = p_stream->p_sys; - msg_Dbg( p_stream, "Waking up srt_epoll_wait"); - if ( write( p_sys->i_pipe_fds[1], &( bool ) { true }, sizeof( bool ) ) < 0 ) + + vlc_mutex_lock( &p_sys->lock ); + if ( p_sys->i_poll_id >= 0 && p_sys->sock != SRT_INVALID_SOCK ) { - msg_Err( p_stream, "Failed to send data to pipe"); + p_sys->b_interrupted = true; + + msg_Dbg( p_stream, "Waking up srt_epoll_wait"); + + /* Removing all socket descriptors from the monitoring list + * wakes up SRT's threads. We only have one to remove. */ + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock ); } + vlc_mutex_unlock( &p_sys->lock ); } static int Control(stream_t *p_stream, int i_query, va_list args) @@ -102,99 +107,28 @@ static int Control(stream_t *p_stream, int i_query, va_list args) return i_ret; } -static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof) +static bool srt_schedule_reconnect(stream_t *p_stream) { - stream_sys_t *p_sys = p_stream->p_sys; - - block_t *pkt = block_Alloc( p_sys->i_chunk_size ); - - if ( unlikely( pkt == NULL ) ) - { - return NULL; - } - - vlc_interrupt_register( srt_wait_interrupted, p_stream); - - SRTSOCKET ready[2]; - - if ( srt_epoll_wait( p_sys->i_poll_id, - ready, &(int) { 2 }, NULL, 0, p_sys->i_poll_timeout, - &(int) { p_sys->i_pipe_fds[0] }, &(int) { 1 }, NULL, 0 ) == -1 ) - { - int srt_err = srt_getlasterror( NULL ); - - /* Assuming that timeout error is normal when SRT socket is connected. */ - if ( srt_err == SRT_ETIMEOUT && srt_getsockstate( p_sys->sock ) == SRTS_CONNECTED ) - { - goto skip; - } - - msg_Err( p_stream, "released poll wait (reason : %s)", srt_getlasterror_str() ); - goto endofstream; - } - - bool cancel = 0; - int ret = read( p_sys->i_pipe_fds[0], &cancel, sizeof( bool ) ); - if ( ret > 0 && cancel ) - { - msg_Dbg( p_stream, "Cancelled running" ); - goto endofstream; - } - - int stat = srt_recvmsg( p_sys->sock, (char *)pkt->p_buffer, p_sys->i_chunk_size ); - - if ( stat == SRT_ERROR ) - { - msg_Err( p_stream, "failed to recevie SRT packet (reason: %s)", srt_getlasterror_str() ); - goto endofstream; - } - - pkt->i_buffer = stat; - vlc_interrupt_unregister(); - return pkt; - -endofstream: - msg_Dbg( p_stream, "EOS"); - *eof = true; -skip: - block_Release(pkt); - srt_clearlasterror(); - vlc_interrupt_unregister(); - - return NULL; -} + int i_latency; + int stat; + char *psz_passphrase = NULL; + vlc_url_t parsed_url = { 0 }; -static int Open(vlc_object_t *p_this) -{ - stream_t *p_stream = (stream_t*)p_this; - stream_sys_t *p_sys = NULL; - vlc_url_t parsed_url = { 0 }; struct addrinfo hints = { .ai_socktype = SOCK_DGRAM, }, *res = NULL; - int stat; - - char *psz_passphrase = NULL; - p_sys = vlc_obj_malloc( p_this, sizeof( *p_sys ) ); - if( unlikely( p_sys == NULL ) ) - return VLC_ENOMEM; + stream_sys_t *p_sys = p_stream->p_sys; + bool failed = false; if ( vlc_UrlParse( &parsed_url, p_stream->psz_url ) == -1 ) { - msg_Err( p_stream, "Failed to parse a given URL (%s)", p_stream->psz_url ); - goto failed; - } - - p_sys->i_chunk_size = var_InheritInteger( p_stream, "chunk-size" ); - p_sys->i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" ); - p_sys->i_latency = var_InheritInteger( p_stream, "latency" ); - p_sys->i_poll_id = -1; - p_stream->p_sys = p_sys; - p_stream->pf_block = BlockSRT; - p_stream->pf_control = Control; + msg_Err( p_stream, "Failed to parse input URL (%s)", + p_stream->psz_url ); - psz_passphrase = var_InheritString( p_stream, "passphrase" ); + failed = true; + goto out; + } stat = vlc_getaddrinfo( parsed_url.psz_host, parsed_url.i_port, &hints, &res ); if ( stat ) @@ -204,91 +138,217 @@ static int Open(vlc_object_t *p_this) parsed_url.i_port, gai_strerror( stat ) ); - goto failed; + failed = true; + goto out; + } + + /* Always start with a fresh socket */ + if (p_sys->sock != SRT_INVALID_SOCK) + { + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock ); + srt_close( p_sys->sock ); } p_sys->sock = srt_socket( res->ai_family, SOCK_DGRAM, 0 ); - if ( p_sys->sock == SRT_ERROR ) + if ( p_sys->sock == SRT_INVALID_SOCK ) { msg_Err( p_stream, "Failed to open socket." ); - goto failed; + failed = true; + goto out; } /* Make SRT non-blocking */ - srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, &(bool) { false }, sizeof( bool ) ); + srt_setsockopt( p_sys->sock, 0, SRTO_SNDSYN, + &(bool) { false }, sizeof( bool ) ); + srt_setsockopt( p_sys->sock, 0, SRTO_RCVSYN, + &(bool) { false }, sizeof( bool ) ); /* Make sure TSBPD mode is enable (SRT mode) */ - srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, &(int) { 1 }, sizeof( int ) ); + srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDMODE, + &(int) { 1 }, sizeof( int ) ); + + /* This is an access module so it is always a receiver */ + srt_setsockopt( p_sys->sock, 0, SRTO_SENDER, + &(int) { 0 }, sizeof( int ) ); /* Set latency */ - srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, &p_sys->i_latency, sizeof( int ) ); + i_latency = var_InheritInteger( p_stream, "latency" ); + srt_setsockopt( p_sys->sock, 0, SRTO_TSBPDDELAY, + &i_latency, sizeof( int ) ); + psz_passphrase = var_InheritString( p_stream, "passphrase" ); if ( psz_passphrase != NULL && psz_passphrase[0] != '\0') { int i_key_length = var_InheritInteger( p_stream, "key-length" ); - srt_setsockopt( p_sys->sock, 0, SRTO_PASSPHRASE, psz_passphrase, strlen( psz_passphrase ) ); srt_setsockopt( p_sys->sock, 0, SRTO_PBKEYLEN, &i_key_length, sizeof( int ) ); } - p_sys->i_poll_id = srt_epoll_create(); - if ( p_sys->i_poll_id == -1 ) + srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, + &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN }); + + /* Schedule a connect */ + msg_Dbg( p_stream, "Schedule SRT connect (dest addresss: %s, port: %d).", + parsed_url.psz_host, parsed_url.i_port); + + stat = srt_connect( p_sys->sock, res->ai_addr, res->ai_addrlen); + if ( stat == SRT_ERROR ) { - msg_Err( p_stream, "Failed to create poll id for SRT socket." ); - goto failed; + msg_Err( p_stream, "Failed to connect to server (reason: %s)", + srt_getlasterror_str() ); + failed = true; } - if ( vlc_pipe( p_sys->i_pipe_fds ) != 0 ) +out: + if (failed && p_sys->sock != SRT_INVALID_SOCK) { - msg_Err( p_stream, "Failed to create pipe fds." ); - goto failed; + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock ); + srt_close(p_sys->sock); + p_sys->sock = SRT_INVALID_SOCK; } - fcntl( p_sys->i_pipe_fds[0], F_SETFL, O_NONBLOCK | fcntl( p_sys->i_pipe_fds[0], F_GETFL ) ); + vlc_UrlClean( &parsed_url ); + freeaddrinfo( res ); + free( psz_passphrase ); - srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, &(int) { SRT_EPOLL_IN } ); - srt_epoll_add_ssock( p_sys->i_poll_id, p_sys->i_pipe_fds[0], &(int) { SRT_EPOLL_IN } ); + return !failed; +} - stat = srt_connect( p_sys->sock, res->ai_addr, sizeof (struct sockaddr) ); +static block_t *BlockSRT(stream_t *p_stream, bool *restrict eof) +{ + stream_sys_t *p_sys = p_stream->p_sys; + int i_chunk_size = var_InheritInteger( p_stream, "chunk-size" ); + int i_poll_timeout = var_InheritInteger( p_stream, "poll-timeout" ); - if ( stat == SRT_ERROR ) + if ( vlc_killed() ) { - msg_Err( p_stream, "Failed to connect to server." ); - goto failed; + /* We are told to stop. Stop. */ + return NULL; } - vlc_UrlClean( &parsed_url ); - freeaddrinfo( res ); - free (psz_passphrase); - - return VLC_SUCCESS; + block_t *pkt = block_Alloc( i_chunk_size ); + if ( unlikely( pkt == NULL ) ) + { + return NULL; + } -failed: + vlc_interrupt_register( srt_wait_interrupted, p_stream); - if ( parsed_url.psz_host != NULL - && parsed_url.psz_buffer != NULL) + SRTSOCKET ready[1]; + int readycnt = 1; + while ( srt_epoll_wait( p_sys->i_poll_id, + ready, &readycnt, 0, 0, + i_poll_timeout, NULL, 0, NULL, 0 ) >= 0) { - vlc_UrlClean( &parsed_url ); + if ( readycnt < 0 || ready[0] != p_sys->sock ) + { + /* should never happen, force recovery */ + srt_close(p_sys->sock); + p_sys->sock = SRT_INVALID_SOCK; + } + + switch( srt_getsockstate( p_sys->sock ) ) + { + case SRTS_CONNECTED: + /* Good to go */ + break; + case SRTS_BROKEN: + case SRTS_NONEXIST: + case SRTS_CLOSED: + /* Failed. Schedule recovery. */ + if ( !srt_schedule_reconnect( p_stream ) ) + msg_Err( p_stream, "Failed to schedule connect" ); + /* Fall-through */ + default: + /* Not ready */ + continue; + } + + int stat = srt_recvmsg( p_sys->sock, + (char *)pkt->p_buffer, i_chunk_size ); + if ( stat > 0 ) + { + pkt->i_buffer = stat; + goto out; + } + + msg_Err( p_stream, "failed to receive packet, set EOS (reason: %s)", + srt_getlasterror_str() ); + *eof = true; + break; } - if ( res != NULL ) + /* if the poll reports errors for any reason at all + * including a timeout, or there is a read error, + * we skip the turn. + */ + + block_Release(pkt); + pkt = NULL; + +out: + vlc_interrupt_unregister(); + + /* Re-add the socket to the poll if we were interrupted */ + vlc_mutex_lock( &p_sys->lock ); + if ( p_sys->b_interrupted ) { - freeaddrinfo( res ); + srt_epoll_add_usock( p_sys->i_poll_id, p_sys->sock, + &(int) { SRT_EPOLL_ERR | SRT_EPOLL_IN } ); + p_sys->b_interrupted = false; } + vlc_mutex_unlock( &p_sys->lock ); + + return pkt; +} + +static int Open(vlc_object_t *p_this) +{ + stream_t *p_stream = (stream_t*)p_this; + stream_sys_t *p_sys = NULL; + + p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) ); + if( unlikely( p_sys == NULL ) ) + return VLC_ENOMEM; + + srt_startup(); - vlc_close( p_sys->i_pipe_fds[0] ); - vlc_close( p_sys->i_pipe_fds[1] ); + vlc_mutex_init( &p_sys->lock ); - if ( p_sys->i_poll_id != -1 ) + p_stream->p_sys = p_sys; + + p_sys->i_poll_id = srt_epoll_create(); + if ( p_sys->i_poll_id == -1 ) { - srt_epoll_release( p_sys->i_poll_id ); - p_sys->i_poll_id = -1; + msg_Err( p_stream, "Failed to create poll id for SRT socket." ); + goto failed; } - srt_close( p_sys->sock ); - free (psz_passphrase); + if ( !srt_schedule_reconnect( p_stream ) ) + { + msg_Err( p_stream, "Failed to schedule connect"); + + goto failed; + } + + p_stream->pf_block = BlockSRT; + p_stream->pf_control = Control; + + return VLC_SUCCESS; + +failed: + vlc_mutex_destroy( &p_sys->lock ); + + if ( p_sys != NULL ) + { + if ( p_sys->sock != -1 ) srt_close( p_sys->sock ); + if ( p_sys->i_poll_id != -1 ) srt_epoll_release( p_sys->i_poll_id ); + + vlc_obj_free( p_this, p_sys ); + p_stream->p_sys = NULL; + } return VLC_EGENERIC; } @@ -298,16 +358,19 @@ static void Close(vlc_object_t *p_this) stream_t *p_stream = (stream_t*)p_this; stream_sys_t *p_sys = p_stream->p_sys; - if ( p_sys->i_poll_id != -1 ) + if ( p_sys ) { + vlc_mutex_destroy( &p_sys->lock ); + + srt_epoll_remove_usock( p_sys->i_poll_id, p_sys->sock ); + srt_close( p_sys->sock ); srt_epoll_release( p_sys->i_poll_id ); - p_sys->i_poll_id = -1; + + vlc_obj_free( p_this, p_sys ); + p_stream->p_sys = NULL; } - msg_Dbg( p_stream, "closing server" ); - srt_close( p_sys->sock ); - vlc_close( p_sys->i_pipe_fds[0] ); - vlc_close( p_sys->i_pipe_fds[1] ); + srt_cleanup(); } /* Module descriptor */ _______________________________________________ vlc-commits mailing list vlc-commits@videolan.org https://mailman.videolan.org/listinfo/vlc-commits