Hi,

We've encountered a race condition when using epoll in combination with pf_ring.
The attached program tc-test.c triggers the issue when compiled with -DUSEEPOLL 
(telling it to use epoll). It works fine when compile without the USEEPOLL flag 
(then it uses poll).
We work on Linux kernel 3.3.8 but I think the same issue will be present in 
other kernels.

There is a race condition in the pf_ring kernel module: when packets have 
already been received before fs/eventpoll.c::ep_insert() calls ring_poll(). In 
more detail:

The function fs/eventpoll.c::ep_ptable_queue_proc() is responsible for 
registering fs/eventpoll.c::ep_poll_callback() as the epoll callback function 
that should be called when packets arrive while epoll is sleeping. The 
ep_ptable_queue_proc function is called indirectly by pf_ring.c::ring_poll() 
through the call to poll_wait(file, &pfr->ring_slots_waitqueue, wait); The 
argument wait is provided as an argument to ring_poll():
unsigned int ring_poll(struct file *file, struct socket *sock, poll_table * 
wait)
The only place in the epoll code where ring_poll() gets called with a 
poll_table that has a callback function is in fs/eventpoll.c::ep_insert().
The ring_poll() function doesn't always call poll_wait(), however:
if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark) {
  poll_wait(file, &pfr->ring_slots_waitqueue, wait);
  // smp_mb();
}

This implies that when enough packets have been received (num_queued_pkts(pfr) 
>= pfr->poll_num_pkts_watermark) before ep_insert() adds the file descriptor to 
its list, the ep_poll_callback function will never be called. This is because 
ring_poll() won't call poll_wait when invoked by ep_insert(). In the test 
program this scenario happens when enough packets were received by pf_ring 
before the call to epoll_ctl.

Note that the poll system call seems to always call ring_poll() with a valid 
wait->qproc function. So the race condition only exists for epoll.

We have a simple workaround that solves our problem. It's obviously not a real 
fix. A patch to the current pf_ring codebase is given below. Note that it won't 
compile on newer versions of the Linux kernel because the member was renamed to 
_qproc (but the idea behind the patch should be clear). Note that I've tested 
PF_RING-5.5.0 and PF_RING-5.5.2, but it looks like the problem is still there 
in the git repo. The commented out part below seems to indicate similar issues 
have been seen before (the commented-out code wasn't there yet in 5.5.2).

--- a/kernel/pf_ring.c
+++ b/kernel/pf_ring.c
@@ -6342,7 +6342,7 @@ unsigned int ring_poll(struct file *file,
 
     /* printk("Before [num_queued_pkts(pfr)=%u]\n", num_queued_pkts(pfr)); */
 
-    if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark /* || 
pfr->num_poll_calls == 1 */) {
+    if(num_queued_pkts(pfr) < pfr->poll_num_pkts_watermark || (wait && 
wait->qproc)) {
       poll_wait(file, &pfr->ring_slots_waitqueue, wait);
       // smp_mb();
     }

Some feedback with more insight into this problem and perhaps a real fix would 
be nice :)

cheers,
Bart


Newtec's New Complete Modem Portfolio... Discover more @ 
http://newproducts.newtec.eu

*** e-mail confidentiality footer ***  This message and any attachments thereto 
are confidential. They may also be privileged or otherwise protected by work 
product immunity or other legal rules. If you have received it by mistake, 
please let us know by e-mail reply and delete it from your system; you may not 
copy this message or disclose its contents to anyone. E-mail transmission 
cannot be guaranteed to be secure or error free as information could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain 
viruses. The sender therefore is in no way liable for any errors or omissions 
in the content of this message, which may arise as a result of e-mail 
transmission. If verification is required, please request a hard copy.
// test code that triggers a race condition when epoll is used together with pf_ring.
#include <stdio.h>
#include <pcap/pcap.h>
#include <stdlib.h>
#include <string.h>
#ifdef USEEPOLL
#include <sys/epoll.h>
#else
#include <poll.h>
#endif
#include <unistd.h>

#include <fcntl.h>

void
_pcap_reader(u_char *userdata, const struct pcap_pkthdr *h, const u_char *p)
{
}

#define MAX_EVENTS 10
     int
     set_nonblock_flag (int desc, int value)
     {
       int oldflags = fcntl (desc, F_GETFL, 0);
       /* If reading the flags failed, return error indication now. */
       if (oldflags == -1)
         return -1;
       /* Set just the flag we want to set. */
       if (value != 0)
         oldflags |= O_NONBLOCK;
       else
         oldflags &= ~O_NONBLOCK;
       /* Store modified flag word in the descriptor. */
       return fcntl (desc, F_SETFL, oldflags);
     }


int main(int argc, char*argv[])
{
	pcap_t *h_in, *h_out;
	char errbuf[PCAP_ERRBUF_SIZE];
	int err;
	const u_char *packet;
	struct pcap_pkthdr hdr;
	int counter = 0;
	int packet_from_pcap;
#ifdef USEEPOLL
	struct epoll_event ev, events[MAX_EVENTS];
#endif
	int nfds,n;


	h_in = pcap_create("data1", errbuf);
	if (h_in == NULL) {
		fprintf(stderr, "Couldn't open device data1: %s\n", errbuf);
		return 1;
	}

	if (pcap_set_snaplen(h_in, 1526)) {
		fprintf(stderr, "could not set snaplen\n");
		return 1;
	}

	if (pcap_set_timeout(h_in, -1)) {
		fprintf(stderr, "could not set timeout\n");
		return 1;
	}
	if (pcap_set_promisc(h_in, 1)) {
		fprintf(stderr, "could not set promisc\n");
		return 1;
	}

	if (pcap_set_buffer_size(h_in, 0x800000)) {
		fprintf(stderr, "could not set promisc\n");
		return 1;
	}

	if (pcap_activate(h_in)) {
		fprintf(stderr, "could not activate\n");
		return 1;
	}

	if (pcap_setdirection(h_in, PCAP_D_IN)) {
		fprintf(stderr, "could not set direction\n");
		return 1;
	}

	if (pcap_setnonblock(h_in, 1, errbuf) == -1) {
		fprintf(stderr, "Could not put in non blocking mode: %s\n", errbuf);
		return 1;
	}

	int pcapfd = pcap_get_selectable_fd(h_in);
	if (pcapfd < 0) {
		fprintf(stderr, "Could not get fd for polling\n");
		return 4;
	}

	set_nonblock_flag(pcapfd, 1);

#ifdef USEEPOLL
	int epollfd = epoll_create(10);
	ev.events = EPOLLIN/* | EPOLLET*/;
	ev.data.fd = pcapfd;

	if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pcapfd, &ev) == -1) {
		fprintf(stderr, "epoll_ctl: EPOLL_CTL_ADD failed\n");
		return 3;
	}
#endif

	//while (pcap_dispatch(h_in, 10, _pcap_reader, NULL) > 0) {
	//	printf("flush before epoll_wait\n");
	//}
	//char buffer[65000];
	//read(pcapfd, buffer, 65000);

	int count = 0;
	int vlag2 = 0;

sleep(1);
	while(1)  {
#ifdef USEEPOLL
// the sleep fixes it
//sleep(1);
		nfds = epoll_wait(epollfd, events, MAX_EVENTS, 10000);
printf("nfds=%d\n", nfds);
		if (nfds == -1) {
		  fprintf(stderr, "epoll_wait failed\n");
		  return 4;
		}

		vlag2 = 0;
		for(n = 0; n < nfds; ++n) {
			if (events[n].data.fd == pcapfd) {
				printf("event %d\n", n);
				vlag2 = 1;
			}
		}
#else
struct	pollfd	fds;
fds.fd = pcapfd;
fds.events = POLLIN;
nfds = poll(&fds, 1, 10000);
printf("nfds=%d\n", nfds);
		if (nfds == -1) {
		  fprintf(stderr, "epoll_wait failed\n");
		  return 4;
		}
		if (nfds > 0)
			vlag2 = 1;
		else
			vlag2 = 0;
#endif

		if (vlag2 == 1) {
				while(packet_from_pcap= pcap_dispatch(h_in, 100,_pcap_reader,(u_char *)NULL) > 0 ) {
++count;
printf("--> count=%d\n", count);
				};
				count = 0;

				if (packet_from_pcap < 0) {
					fprintf(stderr, "pcap_dispatch error\n");
					return 5;
				}
			}
		}

	return 0;

}

_______________________________________________
Ntop-dev mailing list
[email protected]
http://listgateway.unipi.it/mailman/listinfo/ntop-dev

Reply via email to