Hi,
We've encountered a race condition when using epoll in combination with pf_ring.
The attached program tc-test.c triggers the issue when compiled with -DUSEEPOLL
(telling it to use epoll). It works fine when compile without the USEEPOLL flag
(then it uses poll).
We work on Linux kernel 3.3.8 but I think the same issue will be present in
other kernels.
There is a race condition in the pf_ring kernel module: when packets have
already been received before fs/eventpoll.c::ep_insert() calls ring_poll(). In
more detail:
The function fs/eventpoll.c::ep_ptable_queue_proc() is responsible for
registering fs/eventpoll.c::ep_poll_callback() as the epoll callback function
that should be called when packets arrive while epoll is sleeping. The
ep_ptable_queue_proc function is called indirectly by pf_ring.c::ring_poll()
through the call to poll_wait(file, &pfr->ring_slots_waitqueue, wait); The
argument wait is provided as an argument to ring_poll():
unsigned int ring_poll(struct file *file, struct socket *sock, poll_table *
wait)
The only place in the epoll code where ring_poll() gets called with a
poll_table that has a callback function is in fs/eventpoll.c::ep_insert().
The ring_poll() function doesn't always call poll_wait(), however:
if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark) {
poll_wait(file, &pfr->ring_slots_waitqueue, wait);
// smp_mb();
}
This implies that when enough packets have been received (num_queued_pkts(pfr)
>= pfr->poll_num_pkts_watermark) before ep_insert() adds the file descriptor to
its list, the ep_poll_callback function will never be called. This is because
ring_poll() won't call poll_wait when invoked by ep_insert(). In the test
program this scenario happens when enough packets were received by pf_ring
before the call to epoll_ctl.
Note that the poll system call seems to always call ring_poll() with a valid
wait->qproc function. So the race condition only exists for epoll.
We have a simple workaround that solves our problem. It's obviously not a real
fix. A patch to the current pf_ring codebase is given below. Note that it won't
compile on newer versions of the Linux kernel because the member was renamed to
_qproc (but the idea behind the patch should be clear). Note that I've tested
PF_RING-5.5.0 and PF_RING-5.5.2, but it looks like the problem is still there
in the git repo. The commented out part below seems to indicate similar issues
have been seen before (the commented-out code wasn't there yet in 5.5.2).
--- a/kernel/pf_ring.c
+++ b/kernel/pf_ring.c
@@ -6342,7 +6342,7 @@ unsigned int ring_poll(struct file *file,
/* printk("Before [num_queued_pkts(pfr)=%u]\n", num_queued_pkts(pfr)); */
- if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark /* ||
pfr->num_poll_calls == 1 */) {
+ if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark || (wait &&
wait->qproc)) {
poll_wait(file, &pfr->ring_slots_waitqueue, wait);
// smp_mb();
}
Some feedback with more insight into this problem and perhaps a real fix would
be nice :)
cheers,
Bart
Newtec's New Complete Modem Portfolio... Discover more @
http://newproducts.newtec.eu
*** e-mail confidentiality footer *** This message and any attachments thereto
are confidential. They may also be privileged or otherwise protected by work
product immunity or other legal rules. If you have received it by mistake,
please let us know by e-mail reply and delete it from your system; you may not
copy this message or disclose its contents to anyone. E-mail transmission
cannot be guaranteed to be secure or error free as information could be
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain
viruses. The sender therefore is in no way liable for any errors or omissions
in the content of this message, which may arise as a result of e-mail
transmission. If verification is required, please request a hard copy.
// test code that triggers a race condition when epoll is used together with pf_ring.
#include <stdio.h>
#include <pcap/pcap.h>
#include <stdlib.h>
#include <string.h>
#ifdef USEEPOLL
#include <sys/epoll.h>
#else
#include <poll.h>
#endif
#include <unistd.h>
#include <fcntl.h>
void
_pcap_reader(u_char *userdata, const struct pcap_pkthdr *h, const u_char *p)
{
}
#define MAX_EVENTS 10
int
set_nonblock_flag (int desc, int value)
{
int oldflags = fcntl (desc, F_GETFL, 0);
/* If reading the flags failed, return error indication now. */
if (oldflags == -1)
return -1;
/* Set just the flag we want to set. */
if (value != 0)
oldflags |= O_NONBLOCK;
else
oldflags &= ~O_NONBLOCK;
/* Store modified flag word in the descriptor. */
return fcntl (desc, F_SETFL, oldflags);
}
int main(int argc, char*argv[])
{
pcap_t *h_in, *h_out;
char errbuf[PCAP_ERRBUF_SIZE];
int err;
const u_char *packet;
struct pcap_pkthdr hdr;
int counter = 0;
int packet_from_pcap;
#ifdef USEEPOLL
struct epoll_event ev, events[MAX_EVENTS];
#endif
int nfds,n;
h_in = pcap_create("data1", errbuf);
if (h_in == NULL) {
fprintf(stderr, "Couldn't open device data1: %s\n", errbuf);
return 1;
}
if (pcap_set_snaplen(h_in, 1526)) {
fprintf(stderr, "could not set snaplen\n");
return 1;
}
if (pcap_set_timeout(h_in, -1)) {
fprintf(stderr, "could not set timeout\n");
return 1;
}
if (pcap_set_promisc(h_in, 1)) {
fprintf(stderr, "could not set promisc\n");
return 1;
}
if (pcap_set_buffer_size(h_in, 0x800000)) {
fprintf(stderr, "could not set promisc\n");
return 1;
}
if (pcap_activate(h_in)) {
fprintf(stderr, "could not activate\n");
return 1;
}
if (pcap_setdirection(h_in, PCAP_D_IN)) {
fprintf(stderr, "could not set direction\n");
return 1;
}
if (pcap_setnonblock(h_in, 1, errbuf) == -1) {
fprintf(stderr, "Could not put in non blocking mode: %s\n", errbuf);
return 1;
}
int pcapfd = pcap_get_selectable_fd(h_in);
if (pcapfd < 0) {
fprintf(stderr, "Could not get fd for polling\n");
return 4;
}
set_nonblock_flag(pcapfd, 1);
#ifdef USEEPOLL
int epollfd = epoll_create(10);
ev.events = EPOLLIN/* | EPOLLET*/;
ev.data.fd = pcapfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pcapfd, &ev) == -1) {
fprintf(stderr, "epoll_ctl: EPOLL_CTL_ADD failed\n");
return 3;
}
#endif
//while (pcap_dispatch(h_in, 10, _pcap_reader, NULL) > 0) {
// printf("flush before epoll_wait\n");
//}
//char buffer[65000];
//read(pcapfd, buffer, 65000);
int count = 0;
int vlag2 = 0;
sleep(1);
while(1) {
#ifdef USEEPOLL
// the sleep fixes it
//sleep(1);
nfds = epoll_wait(epollfd, events, MAX_EVENTS, 10000);
printf("nfds=%d\n", nfds);
if (nfds == -1) {
fprintf(stderr, "epoll_wait failed\n");
return 4;
}
vlag2 = 0;
for(n = 0; n < nfds; ++n) {
if (events[n].data.fd == pcapfd) {
printf("event %d\n", n);
vlag2 = 1;
}
}
#else
struct pollfd fds;
fds.fd = pcapfd;
fds.events = POLLIN;
nfds = poll(&fds, 1, 10000);
printf("nfds=%d\n", nfds);
if (nfds == -1) {
fprintf(stderr, "epoll_wait failed\n");
return 4;
}
if (nfds > 0)
vlag2 = 1;
else
vlag2 = 0;
#endif
if (vlag2 == 1) {
while(packet_from_pcap= pcap_dispatch(h_in, 100,_pcap_reader,(u_char *)NULL) > 0 ) {
++count;
printf("--> count=%d\n", count);
};
count = 0;
if (packet_from_pcap < 0) {
fprintf(stderr, "pcap_dispatch error\n");
return 5;
}
}
}
return 0;
}
_______________________________________________
Ntop-dev mailing list
[email protected]
http://listgateway.unipi.it/mailman/listinfo/ntop-dev