vlc | branch: master | Sergio Ammirata <ser...@ammirata.net> | Tue Mar 19 08:14:32 2019 -0400| [b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec] | committer: Thomas Guillem
access: rist: add support for multicast Signed-off-by: Thomas Guillem <tho...@gllm.fr> > http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=b1aea125d8f7e91c1fc6ad4e5a557a7a8ca21dec --- modules/access/rist.c | 150 +++++++++++++++++++++++++++++++++++++------------- modules/access/rist.h | 32 ++++++++++- 2 files changed, 143 insertions(+), 39 deletions(-) diff --git a/modules/access/rist.c b/modules/access/rist.c index f6e572b863..e06828c88a 100644 --- a/modules/access/rist.c +++ b/modules/access/rist.c @@ -85,7 +85,11 @@ typedef struct int i_max_packet_size; int i_poll_timeout; int i_poll_timeout_current; - bool eof_on_reset; + bool b_ismulticast; + bool b_sendnacks; + bool b_sendblindnacks; + bool b_disablenacks; + bool b_flag_discontinuity; block_fifo_t *p_fifo; vlc_mutex_t lock; uint64_t last_message; @@ -97,6 +101,7 @@ typedef struct float vbr_ratio; uint16_t vbr_ratio_count; uint32_t i_lost_packets; + uint32_t i_nack_packets; uint32_t i_recovered_packets; uint32_t i_reordered_packets; uint32_t i_total_packets; @@ -131,7 +136,7 @@ static struct rist_flow *rist_init_rx(void) if (!flow) return NULL; - flow->reset = 2; + flow->reset = 1; flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt)); if ( unlikely( flow->buffer == NULL ) ) @@ -139,6 +144,10 @@ static struct rist_flow *rist_init_rx(void) free(flow); return NULL; } + flow->fd_in = -1; + flow->fd_nack = -1; + flow->fd_rtcp_m = -1; + return flow; } @@ -150,7 +159,7 @@ static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, vlc_mutex_unlock( &lock ); } -static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url) +static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast) { stream_sys_t *p_sys = p_access->p_sys; msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d", @@ -166,21 +175,47 @@ static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed if (p_sys->flow->fd_in < 0) { msg_Err( p_access, "cannot open input socket" ); - return NULL; + goto fail; } - p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1, - NULL, 0, IPPROTO_UDP); + if (b_ismulticast) + { + p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1, + NULL, 0, IPPROTO_UDP); + if (p_sys->flow->fd_rtcp_m < 0) + { + msg_Err( p_access, "cannot open multicast nack socket" ); + goto fail; + } + p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host, + parsed_url->i_port + 1, -1, IPPROTO_UDP ); + } + else + { + p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1, + NULL, 0, IPPROTO_UDP); + } if (p_sys->flow->fd_nack < 0) { msg_Err( p_access, "cannot open nack socket" ); - return NULL; + goto fail; } populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname); msg_Info(p_access, "our cname is %s", p_sys->flow->cname); return p_sys->flow; + +fail: + if (p_sys->flow->fd_in != -1) + vlc_close(p_sys->flow->fd_in); + if (p_sys->flow->fd_nack != -1) + vlc_close(p_sys->flow->fd_nack); + if (p_sys->flow->fd_rtcp_m != -1) + vlc_close(p_sys->flow->fd_rtcp_m); + free(p_sys->flow->buffer); + free(p_sys->flow); + return NULL; } static int is_index_in_range(struct rist_flow *flow, uint16_t idx) @@ -263,8 +298,9 @@ static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count; /* Write to Socket */ - rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, - (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen); + if (p_sys->b_sendnacks && p_sys->b_disablenacks == false) + rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, + (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen); free(buf); buf = NULL; } @@ -302,8 +338,9 @@ static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uin len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count; /* Write to Socket */ - rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, - (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen); + if (p_sys->b_sendnacks && p_sys->b_disablenacks == false) + rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len, + (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen); free(buf); buf = NULL; } @@ -361,6 +398,7 @@ static void send_nacks(stream_t *p_access, struct rist_flow *flow) } if (nacks_len > 0) { + p_sys->i_nack_packets += nacks_len; block_t *pkt_nacks = block_Alloc(nacks_len * 2); if (pkt_nacks) { @@ -485,6 +523,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_ case RTCP_PT_SDES: { + if (p_sys->b_sendnacks == false) + p_sys->b_sendnacks = true; + if (p_sys->b_ismulticast) + return; /* Check for changes in source IP address or port */ int8_t name_length = rtcp_sdes_get_name_length(buf); if (name_length > bytes_left) @@ -535,6 +577,10 @@ static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_ break; case RTCP_PT_SR: + if (p_sys->b_sendnacks == false) + p_sys->b_sendnacks = true; + if (p_sys->b_ismulticast) + return; break; default: @@ -566,17 +612,8 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, uint32_t pkt_ts = rtp_get_timestamp(buf); bool retrasnmitted = false; bool success = true; - uint64_t now = vlc_tick_now(); - if (flow->reset == 2) - { - if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) { - msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ..."); - } - p_sys->last_message = now; - return success; - } - else if (flow->reset == 1) + if (flow->reset == 1) { msg_Info(p_access, "Traffic detected after buffer reset"); /* First packet in the queue */ @@ -585,6 +622,7 @@ static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, flow->wi = idx; flow->ri = idx; flow->reset = 0; + p_sys->b_flag_discontinuity = true; } /* Check to see if this is a retransmission or a regular packet */ @@ -725,6 +763,7 @@ static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow) msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount, flow->ri, flow->wi); p_sys->i_lost_packets += loss_amount; + p_sys->b_flag_discontinuity = true; } return pktout; @@ -769,23 +808,30 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) uint64_t now; *eof = false; block_t *pktout = NULL; - struct pollfd pfd[2]; + struct pollfd pfd[3]; int ret; ssize_t r; struct sockaddr_storage peer; socklen_t slen = sizeof(struct sockaddr_storage); struct rist_flow *flow = p_sys->flow; - if (vlc_killed() || (flow->reset == 1 && p_sys->eof_on_reset)) + if (vlc_killed()) { *eof = true; return NULL; } + int poll_sockets = 2; pfd[0].fd = flow->fd_in; pfd[0].events = POLLIN; pfd[1].fd = flow->fd_nack; pfd[1].events = POLLIN; + if (p_sys->b_ismulticast) + { + pfd[2].fd = flow->fd_rtcp_m; + pfd[2].events = POLLIN; + poll_sockets++; + } /* The protocol uses a fifo buffer with a fixed time delay. * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the @@ -795,7 +841,7 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers * most cases. */ - ret = vlc_poll_i11e(pfd, 2, p_sys->i_poll_timeout_current); + ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current); if (unlikely(ret < 0)) return NULL; else if (ret == 0) @@ -827,6 +873,18 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno)); } else { + if (p_sys->b_ismulticast == false) + rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen); + } + } + if (p_sys->b_ismulticast && pfd[2].revents & POLLIN) + { + r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size, + (struct sockaddr *)&peer, &slen); + if (unlikely(r == -1)) { + msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno)); + } + else { rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen); } } @@ -853,11 +911,6 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) p_sys->i_poll_timeout_nonzero_count++; } } - else - { - if (p_sys->eof_on_reset) - *eof = true; - } } } @@ -901,13 +954,15 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets + p_sys->i_reordered_packets)/(float)p_sys->i_total_packets; if (quality != 100) - msg_Info(p_access, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \ - "%.2f, Link Quality %.2f%%", p_sys->i_total_packets, p_sys->i_recovered_packets, - p_sys->i_reordered_packets, p_sys->i_lost_packets, ratio, quality); + msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \ + "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets, + p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets, + p_sys->i_lost_packets, ratio, quality); p_sys->i_last_stat = now; p_sys->vbr_ratio = 0; p_sys->vbr_ratio_count = 0; p_sys->i_lost_packets = 0; + p_sys->i_nack_packets = 0; p_sys->i_recovered_packets = 0; p_sys->i_reordered_packets = 0; p_sys->i_total_packets = 0; @@ -943,7 +998,13 @@ static block_t *BlockRIST(stream_t *p_access, bool *restrict eof) } if (pktout) + { + if (p_sys->b_flag_discontinuity) { + pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY; + p_sys->b_flag_discontinuity = false; + } return pktout; + } else return NULL; } @@ -961,6 +1022,8 @@ static void Clean( stream_t *p_access ) net_Close (p_sys->flow->fd_in); if (p_sys->flow->fd_nack >= 0) net_Close (p_sys->flow->fd_nack); + if (p_sys->flow->fd_rtcp_m >= 0) + net_Close (p_sys->flow->fd_rtcp_m); for (int i=0; i<RIST_QUEUE_SIZE; i++) { struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]); if (pkt->buffer && pkt->buffer->i_buffer > 0) { @@ -1008,7 +1071,8 @@ static int Open(vlc_object_t *p_this) } /* Initialize rist flow */ - p_sys->flow = rist_udp_receiver(p_access, &parsed_url); + p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host); + p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast); vlc_UrlClean( &parsed_url ); if (!p_sys->flow) { @@ -1017,14 +1081,23 @@ static int Open(vlc_object_t *p_this) goto failed; } + p_sys->b_flag_discontinuity = false; + p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" ); + p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" ); + if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false) + p_sys->b_sendnacks = true; + else + p_sys->b_sendnacks = false; p_sys->nack_type = var_InheritInteger( p_access, "nack-type" ); p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" ); p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" ); - p_sys->eof_on_reset = var_InheritBool( p_access, "eof-on-reset" ); p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" ); - p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" ); p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" ); p_sys->flow->latency = var_InheritInteger( p_access, "latency" ); + if (p_sys->b_disablenacks) + p_sys->flow->reorder_buffer = p_sys->flow->latency; + else + p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" ); msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency); /* Convert to rtp times */ @@ -1073,11 +1146,14 @@ vlc_module_begin () add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"), NULL, true ) add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true ) - add_bool( "eof-on-reset", false, "Trigger an EOF event when a buffer reset is triggered", - "This is probably useful when you are decoding but not so much if you are streaming", true ) add_integer( "nack-type", NACK_FMT_RANGE, N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true ) change_integer_list( nack_type, nack_type_names ) + add_bool( "disable-nacks", false, "Disable NACK output packets", + "Use this to disable packet recovery", true ) + add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder", + "Send nack messages even when we have not confirmed that the encoder is on our local " \ + "network.", true ) set_capability( "access", 0 ) add_shortcut( "rist", "tr06" ) diff --git a/modules/access/rist.h b/modules/access/rist.h index 75ade277d2..ec0f6ae0f9 100644 --- a/modules/access/rist.h +++ b/modules/access/rist.h @@ -92,11 +92,11 @@ struct rist_flow { char cname[MAX_CNAME]; struct sockaddr_storage peer_sockaddr; socklen_t peer_socklen; - uint16_t ri, - wi; + uint16_t ri, wi; int fd_in; int fd_out; int fd_rtcp; + int fd_rtcp_m; int fd_nack; uint8_t nacks_retries[RIST_QUEUE_SIZE]; uint32_t hi_timestamp; @@ -337,3 +337,31 @@ static inline ssize_t rist_Write(int fd, const void *buf, size_t len) { return rist_WriteTo(fd, buf, len, NULL, 0); } + +static bool is_multicast_address(char *psz_dst_server) +{ + int ret; + int ismulticast = 0; + + struct addrinfo hint = { + .ai_socktype = SOCK_DGRAM, + .ai_protocol = IPPROTO_UDP, + .ai_flags = AI_NUMERICSERV | AI_IDN | AI_PASSIVE, + }, *res; + + ret = vlc_getaddrinfo(psz_dst_server, 0, &hint, &res); + if (ret) { + return 0; + } else if(res->ai_family == AF_INET) { + unsigned long addr = ntohl(inet_addr(psz_dst_server)); + ismulticast = IN_MULTICAST(addr); + } else if (res->ai_family == AF_INET6) { + if (strlen(psz_dst_server) >= 5 && (strncmp("[ff00", psz_dst_server, 5) == 0 || + strncmp("[FF00", psz_dst_server, 5) == 0)) + ismulticast = 1; + } + + freeaddrinfo(res); + + return ismulticast; +} \ No newline at end of file _______________________________________________ vlc-commits mailing list vlc-commits@videolan.org https://mailman.videolan.org/listinfo/vlc-commits