Hi Marc,

thanks for quick response. I couldn't find any signs of errors on the sending side, but was able to resolve this by opening separate socket in each process with option SO_REUSEPORT (details here: https://lwn.net/Articles/542629/). I also switched to pthreads to minimize context switching overhead and removed logging, which was becoming bottleneck with heavy incoming traffic. Please find working prototype attached and thanks for libev!

Happy New Year!

Thanks,
Kirill.

On 12/29/2015 09:27 PM, Marc Lehmann wrote:
On Tue, Dec 29, 2015 at 12:20:43PM -0800, Kirill Timofeev 
<[email protected]> wrote:
but I see that some incoming packets are lost with 2 childs:
All I can see is that 8476 packets apparently have been received, not hat
they have been lost - you don't seem to do any error checking on send, and
depending on how fast ruby does it's thing, it could easily make the kernel
run out of packet buffers.

so first make sure you don't get ENOBUFS or another error.

I haven't looked at your progrma in detail, but since running two
processes is much less efficient than one, I would first check that you
really did send all those packets.

I also don't know of anything that could go wrong specifically with
multiple processes, apart from the myriad issues around udp receiving in
general.


#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <netinet/in.h>
#include <ev.h>
#include <netdb.h>
#include <sys/fcntl.h>
#include <time.h>
#include <signal.h>
#include <stdarg.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

#define DATA_BUF_SIZE 4096
#define LOG_BUF_SIZE 2048

typedef struct {
    pthread_t thread;
    int id;
    int data_port;
    int counter;
} thread_args_s;

thread_args_s thread_args[16];

struct ev_periodic_id {
    struct ev_periodic super;
    int id;
};

struct ev_io_id {
    struct ev_io super;
    int id;
};

int num_of_childs = 0;

void log_msg(char *format, ...) {
    va_list args;
    time_t t;
    struct tm *tinfo;
    char buffer[LOG_BUF_SIZE];
    int l = 0;

    va_start(args, format);
    time(&t);
    tinfo = localtime(&t);
    l = strftime(buffer, LOG_BUF_SIZE, "%Y-%m-%d %H:%M:%S", tinfo);
    l += sprintf(buffer + l, " %d ", getpid());
    vsnprintf(buffer + l, LOG_BUF_SIZE - l, format, args);
    va_end(args);
    fprintf(stdout, "%s\n", buffer);
    fflush(stdout);
}

void udp_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
    char buffer[DATA_BUF_SIZE];
    ssize_t bytes_in_buffer;

    if (EV_ERROR & revents) {
        log_msg("%s: invalid event %s", __func__, strerror(errno));
        return;
    }

    bytes_in_buffer = recv(watcher->fd, buffer, DATA_BUF_SIZE - 1, 0);

    if (bytes_in_buffer < 0) {
        log_msg("%s: read() failed %s", __func__, strerror(errno));
        return;
    }

    if (bytes_in_buffer > 0) {
        if (buffer[bytes_in_buffer - 1] == '\n') {
            bytes_in_buffer--;
        }
        thread_args[((struct ev_io_id *)watcher)->id].counter++;
//        log_msg("%s: %d got packet %.*s", __func__, ((struct ev_io_id *)watcher)->id, bytes_in_buffer, buffer);
    } else {
        log_msg("%s: got empty packet", __func__);
    }
}

void timer_cb(struct ev_loop *loop, struct ev_periodic *p, int revents) {
    log_msg("%s: %d ok", __func__, ((struct ev_periodic_id *)p)->id);
}

void *run_thread(void *args) {
    int data_socket;
    struct sockaddr_in addr;
    int id = (*((thread_args_s *)args)).id;
    int data_port = (*((thread_args_s *)args)).data_port;
    struct ev_loop *loop = NULL;
    struct ev_io_id socket_watcher;
    struct ev_periodic_id timer_watcher;
    ev_tstamp timer_at = 0.0;
    double timer_interval = 10.0;
    int optval = 1;

    if ((data_socket = socket(PF_INET, SOCK_DGRAM, 0)) < 0 ) {
        log_msg("%s: socket() error %s", __func__, strerror(errno));
        return NULL;
    }
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(data_port);
    addr.sin_addr.s_addr = INADDR_ANY;

    setsockopt(data_socket, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));

    if (bind(data_socket, (struct sockaddr*) &addr, sizeof(addr)) != 0) {
        log_msg("%s: bind() failed %s", __func__, strerror(errno));
        return NULL;
    }

    loop = ev_loop_new(0);
    ev_io_init((struct ev_io *)(&socket_watcher), udp_read_cb, data_socket, EV_READ);
    socket_watcher.id = id;
    ev_io_start(loop, (struct ev_io *)(&socket_watcher));

    ev_periodic_init ((struct ev_periodic *)(&timer_watcher), timer_cb, timer_at, timer_interval, 0);
    timer_watcher.id = id;
    ev_periodic_start (loop, (struct ev_periodic *)(&timer_watcher));

    ev_loop(loop, 0);
    log_msg("%s: ev_loop() exited", __func__);
    return NULL;
}

void on_sigint(int sig) {
    int i = 0;
    int total_counter = 0;

    for (i = 0; i < num_of_childs; i++) {
        total_counter += thread_args[i].counter;
    }
    log_msg("%s: sigint received %d", __func__, total_counter);
    exit(0);
}

int main(int argc, char *argv[]) {
    int i = 0, status;
    int data_port = 0;

    if (argc == 3) {
        num_of_childs = atoi(argv[1]);
        data_port = atoi(argv[2]);
    }
    if (data_port == 0 || num_of_childs == 0) {
       printf("Usage: %s num_of_childs data_port\n", argv[0]);
       exit(1);
    }

    if (signal(SIGINT, on_sigint) == SIG_ERR) {
        log_msg("%s: signal() failed", __func__);
        exit(1);
    }

    for (i = 0; i < num_of_childs; i++) {
        thread_args[i].id = i;
        thread_args[i].data_port = data_port;
        thread_args[i].counter = 0;
        pthread_create(&thread_args[i].thread, NULL, run_thread, (void *)(&thread_args[i]));
    }
    for (i = 0; i < num_of_childs; i++) {
        pthread_join(thread_args[i].thread, NULL);
    }
}
_______________________________________________
libev mailing list
[email protected]
http://lists.schmorp.de/mailman/listinfo/libev

Reply via email to