Hi folks,
I'm trying to have single udp socket opened by the master process and
read data from it in the worker processes. Please find test source
attached. I'm using following simple ruby script to generate test traffic:
#!/usr/bin/env ruby
require 'socket'
u = UDPSocket.new
u.connect("127.0.0.1", 1234)
10000.times do |i|
u.send "this is line #{i}", 0
end
This works ok with single child:
$ ./a.out 1 1234 >log & ./gen-udp-traffic.rb && sleep 5 && killall a.out
&& grep -c udp_read_cb: log
[1] 22560
[1]+ Terminated ./a.out 1 1234 > log
10000
but I see that some incoming packets are lost with 2 childs:
$ ./a.out 2 1234 >log & ./gen-udp-traffic.rb && sleep 5 && killall a.out
&& grep -c udp_read_cb: log
[1] 23387
[1]+ Terminated ./a.out 2 1234 > log
8476
I run my tests on the ubuntu 14.04 x64. I use libev installed from the
package:
ii libev-dev 1:4.15-3 amd64 static
library, header files, and docs for libev
I compile my code using following command:
gcc 024-udp-test.c -lev
I tried to find any hints by googling, but unfortunately couldn't find
anything relevant.
Please advise what should I do to avoid packet loss.
Thanks,
Kirill.
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <netinet/in.h>
#include <ev.h>
#include <netdb.h>
#include <sys/fcntl.h>
#include <time.h>
#include <signal.h>
#include <stdarg.h>
#include <unistd.h>
#include <errno.h>
#define DATA_BUF_SIZE 4096
#define LOG_BUF_SIZE 2048
void log_msg(char *format, ...) {
va_list args;
time_t t;
struct tm *tinfo;
char buffer[LOG_BUF_SIZE];
int l = 0;
va_start(args, format);
time(&t);
tinfo = localtime(&t);
l = strftime(buffer, LOG_BUF_SIZE, "%Y-%m-%d %H:%M:%S", tinfo);
l += sprintf(buffer + l, " %d ", getpid());
vsnprintf(buffer + l, LOG_BUF_SIZE - l, format, args);
va_end(args);
fprintf(stdout, "%s\n", buffer);
fflush(stdout);
}
void udp_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
char buffer[DATA_BUF_SIZE];
ssize_t bytes_in_buffer;
if (EV_ERROR & revents) {
log_msg("%s: invalid event %s", __func__, strerror(errno));
return;
}
bytes_in_buffer = recv(watcher->fd, buffer, DATA_BUF_SIZE - 1, 0);
if (bytes_in_buffer < 0) {
log_msg("%s: read() failed %s", __func__, strerror(errno));
return;
}
if (bytes_in_buffer > 0) {
if (buffer[bytes_in_buffer - 1] == '\n') {
bytes_in_buffer--;
}
log_msg("%s: got packet %.*s", __func__, bytes_in_buffer, buffer);
} else {
log_msg("%s: got empty packet", __func__);
}
}
void timer_cb(struct ev_loop *loop, struct ev_periodic *p, int revents) {
log_msg("%s: ok", __func__);
}
int main(int argc, char *argv[]) {
struct ev_loop *loop = NULL;
int data_socket;
struct sockaddr_in addr;
struct ev_io socket_watcher;
struct ev_periodic timer_watcher;
ev_tstamp timer_at = 0.0;
int i = 0, status;
pid_t pids[16];
int num_of_childs = 0;
int data_port = 0;
double timer_interval = 10.0;
if (argc == 3) {
num_of_childs = atoi(argv[1]);
data_port = atoi(argv[2]);
}
if (data_port == 0 || num_of_childs == 0) {
printf("Usage: %s num_of_childs data_port\n", argv[0]);
exit(1);
}
if ((data_socket = socket(PF_INET, SOCK_DGRAM, 0)) < 0 ) {
log_msg("%s: socket() error %s", __func__, strerror(errno));
return(1);
}
bzero(&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(data_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(data_socket, (struct sockaddr*) &addr, sizeof(addr)) != 0) {
log_msg("%s: bind() failed %s", __func__, strerror(errno));
return(1);
}
for (i = 0; i < num_of_childs; i++) {
pid_t pid = fork();
if (pid == 0) {
loop = ev_default_loop(EVFLAG_FORKCHECK);
ev_io_init(&socket_watcher, udp_read_cb, data_socket, EV_READ);
ev_io_start(loop, &socket_watcher);
ev_periodic_init (&timer_watcher, timer_cb, timer_at, timer_interval, 0);
ev_periodic_start (loop, &timer_watcher);
ev_loop(loop, 0);
log_msg("%s: ev_loop() exited", __func__);
return(0);
} else {
pids[i] = pid;
}
}
for (i = 0; i < num_of_childs; i++) {
waitpid(pids[i], &status, 0);
}
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/mailman/listinfo/libev