Revision: 489
http://vde.svn.sourceforge.net/vde/?rev=489&view=rev
Author: danielel
Date: 2011-04-08 11:43:36 +0000 (Fri, 08 Apr 2011)
Log Message:
-----------
Added branch for wirefilter2 experimental code. WIP.
Modified Paths:
--------------
branches/danielinux-wirefilter2/src/wirefilter.c
Added Paths:
-----------
branches/danielinux-wirefilter2/
Modified: branches/danielinux-wirefilter2/src/wirefilter.c
===================================================================
--- trunk/vde-2/src/wirefilter.c 2011-04-05 10:05:11 UTC (rev 488)
+++ branches/danielinux-wirefilter2/src/wirefilter.c 2011-04-08 11:43:36 UTC
(rev 489)
@@ -1,11 +1,11 @@
-/* WIREFILTER (C) 2005 Renzo Davoli
+/* WIREFILTER2 (C) 2011 Renzo Davoli, Daniele Lacamera
* Licensed under the GPLv2
+ * Based on "wirefilter" by Renzo Davoli
* Modified by Ludovico Gardenghi 2005
* Modified by Renzo Davoli, Luca Bigliardi 2007
- * Modified by Renzo Davoli, Luca Raggi 2009 (Markov chain support)
- * Gauss normal distribution/blinking support, requested and parlty implemented
- * by Luca Saiu and Jean-Vincent Loddo (Marionnet project)
- * Gilbert model for packet loss requested by Leandro Galvao.
+ * Modified by Renzo Davoli, Luca Raggi 2009
+ * Some implementation by:
+ * Luca Saiu and Jean-Vincent Loddo (Marionnet project)
*
* This filter can be used for testing network protcols.
* It is possible to loose, delay or reorder packets.
@@ -82,6 +82,17 @@
#define MTU 8
#define NUMVALUES 9
+#define BUFSIZE 2048
+#define MAXCMD 128
+#define MGMTMODEARG 129
+#define DAEMONIZEARG 130
+#define PIDFILEARG 131
+#define LOGSOCKETARG 132
+#define LOGIDARG 133
+#define KILO (1<<10)
+#define MEGA (1<<20)
+#define GIGA (1<<30)
+
/* general Markov chain approach */
int markov_numnodes=0;
int markov_current=0;
@@ -121,6 +132,172 @@
static char *blinkmsg;
static char blinkidlen;
+static inline unsigned long long
+gettimeofdayms(void) {
+ struct timeval tv;
+ gettimeofday(&tv, 0);
+ return (unsigned long long) tv.tv_sec * 1000ULL + (unsigned long long)
tv.tv_usec / 1000;
+}
+
+/*more than 98% inside the bell */
+#define SIGMA (1.0/3.0)
+static double compute_wirevalue(int tag, int dir)
+{
+ struct wirevalue *wv=&WFVAL(markov_current,tag,dir);
+ if (wv->plus == 0)
+ return wv->value;
+ switch (wv->alg) {
+ case ALGO_UNIFORM:
+ return wv->value+wv->plus*((drand48()*2.0)-1.0);
+ case ALGO_GAUSS_NORMAL:
+ {
+ double x,y,r2;
+ do {
+ x = (2*drand48())-1;
+ y = (2*drand48())-1;
+ r2=x*x+y*y;
+ } while (r2 >= 1.0);
+ return wv->value+wv->plus* SIGMA * x * sqrt (
(-2 * log(r2)) /r2);
+ }
+ default:
+ return 0.0;
+ }
+}
+
+
+/*** WF 2 ***/
+
+struct wf_packet {
+ struct wf_packet *next;
+ unsigned char payload[BUFSIZE];
+ unsigned short size;
+ unsigned long long dequeue_time;
+ int dir;
+};
+
+static unsigned long outqueue_delay;
+static struct wf_packet *wf_queue_in[2];
+static struct wf_packet *wf_queue_out[2];
+
+int queue_size(struct wf_packet *p) {
+ int n = 0;
+ while(p){
+ n++;
+ p=p->next;
+ }
+ return n;
+}
+
+static struct wf_packet *_pkt_enqueue(struct wf_packet *q, struct wf_packet
*pkt)
+{
+ if (!q)
+ return pkt;
+
+ if (pkt->dequeue_time < q->dequeue_time) {
+ pkt->next = q;
+ return pkt;
+ }
+ q->next = _pkt_enqueue(q->next, pkt);
+ return q;
+}
+
+static void pkt_enqueue_in(struct wf_packet *pkt)
+{
+ struct wf_packet *q = wf_queue_in[pkt->dir];
+ pkt->next = NULL;
+ wf_queue_in[pkt->dir] = _pkt_enqueue(q, pkt);
+ fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir,
queue_size(wf_queue_in[pkt->dir]));
+
+}
+
+static void pkt_enqueue_out(struct wf_packet *pkt)
+{
+ struct wf_packet *q = wf_queue_out[pkt->dir];
+ pkt->next = NULL;
+ wf_queue_out[pkt->dir] = _pkt_enqueue(q, pkt);
+ fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now:
%d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir]));
+}
+
+static int is_time_to_dequeue(int dir)
+{
+ unsigned long long now = gettimeofdayms();
+ if (wf_queue_in[dir])
+ return (now >= wf_queue_in[dir]->dequeue_time);
+ else return 0;
+}
+
+static int process_queue_out(void)
+{
+ static unsigned long long now, last_out[2] = {0ULL, 0ULL};
+ struct wf_packet *pkt;
+ int i, count[2] = {0}, old_count[2] = {0};
+ do {
+ old_count[0] = count[0];
+ old_count[1] = count[1];
+ for (i = 0; i < 2; i++) {
+ double bandval;
+ pkt = wf_queue_out[i];
+ if (!pkt)
+ continue;
+ bandval = compute_wirevalue(BAND,i);
+ if (bandval == 0) {
+ writepacket(i, pkt->payload, pkt->size);
+ wf_queue_out[i] = pkt->next;
+ count[i] += pkt->size;
+ last_out[i] = gettimeofdayms();
+ free(pkt);
+ } else {
+ now = gettimeofdayms();
+ pkt->dequeue_time = (unsigned long long)
((double)last_out[i] + (((double)(pkt->size + count[i])*1000) / bandval));
+ if (now >= pkt->dequeue_time) {
+ writepacket(i, pkt->payload, pkt->size);
+ wf_queue_out[i] = pkt->next;
+ count[i] += pkt->size;
+ last_out[i] = now;
+ free(pkt);
+ }
+ }
+ }
+ } while (count[0] > old_count[0] || count[1] > old_count[1]);
+
+ if (count[0] > 0)
+ fprintf(stderr,">>------------> OUT process queue: %d bytes
transferred\n", count[0]);
+ if (count[1] > 0)
+ fprintf(stderr,"<------------<< OUT process queue: %d bytes
transferred\n", count[1]);
+ return count[0] + count[1];
+}
+
+static int process_queue_in(void)
+{
+ struct wf_packet *p;
+ int i, count = 0;
+ for (i = 0; i < 2; i++) {
+ if(is_time_to_dequeue(i)) {
+ p = wf_queue_in[i];
+ wf_queue_in[i] = p->next;
+ pkt_enqueue_out(p);
+ count++;
+ }
+ }
+ if (count > 0)
+ fprintf(stderr,"process queue: %d packets transferred\n",
count);
+ return count;
+}
+
+static struct wf_packet
+*pkt_discard(struct wf_packet *q, struct wf_packet *pkt)
+{
+ if (!q)
+ return NULL;
+ if (pkt == q) {
+ free(pkt);
+ return q->next;
+ } else
+ q->next = pkt_discard(q->next, pkt);
+ return q;
+}
+
+
static void printoutc(int fd, const char *format, ...);
/* markov node mgmt */
static inline struct markov_node *markov_node_new(void)
@@ -235,16 +412,6 @@
}
}
-#define BUFSIZE 2048
-#define MAXCMD 128
-#define MGMTMODEARG 129
-#define DAEMONIZEARG 130
-#define PIDFILEARG 131
-#define LOGSOCKETARG 132
-#define LOGIDARG 133
-#define KILO (1<<10)
-#define MEGA (1<<20)
-#define GIGA (1<<30)
static inline double max_wirevalue(int node,int tag, int dir)
{
@@ -263,30 +430,6 @@
srand48(v.tv_sec ^ v.tv_usec ^ getpid());
}
-/*more than 98% inside the bell */
-#define SIGMA (1.0/3.0)
-static double compute_wirevalue(int tag, int dir)
-{
- struct wirevalue *wv=&WFVAL(markov_current,tag,dir);
- if (wv->plus == 0)
- return wv->value;
- switch (wv->alg) {
- case ALGO_UNIFORM:
- return wv->value+wv->plus*((drand48()*2.0)-1.0);
- case ALGO_GAUSS_NORMAL:
- {
- double x,y,r2;
- do {
- x = (2*drand48())-1;
- y = (2*drand48())-1;
- r2=x*x+y*y;
- } while (r2 >= 1.0);
- return wv->value+wv->plus* SIGMA * x * sqrt (
(-2 * log(r2)) /r2);
- }
- default:
- return 0.0;
- }
-}
void printlog(int priority, const char *format, ...)
{
@@ -544,52 +687,110 @@
}
}
-void handle_packet(int dir,const unsigned char *buf,int size)
+unsigned long time_in_outqueue(dir)
{
+ unsigned long bytes_in_queue = 0;
+ struct wf_packet *pkt = wf_queue_out[dir];
+ double bw_val = max_wirevalue(markov_current,BAND,dir);
+ unsigned long timetogo = 0;
+
+ if (!bw_val) {
+ return 0;
+ }
+ while(pkt) {
+ bytes_in_queue += pkt->size;
+ pkt = pkt->next;
+ }
+ timetogo = 1000 * (bytes_in_queue / bw_val);
+ fprintf(stderr,"Time that will be spent in out queue: %lu ms (queue
size: %lu B, speed: %.2f B/s)\n",
+ timetogo, bytes_in_queue, bw_val);
+ return timetogo;
+}
+
+void set_ingres_delay(struct wf_packet *pkt)
+{
+ int banddelay =
+ pkt->dequeue_time = 0;
+ if (banddelay >= 0) {
+ if (banddelay > 0 ||
max_wirevalue(markov_current,DELAY,pkt->dir) > 0) {
+ double delval=compute_wirevalue(DELAY,pkt->dir);
+ delval=(delval >= 0)?delval+banddelay:banddelay;
+ if (delval > 0) {
+ struct timeval tv;
+ unsigned long long now = gettimeofdayms();
+ pkt->dequeue_time = now + delval -
time_in_outqueue(pkt->dir);
+ }
+ }
+ }
+}
+
+void handle_packet(struct wf_packet *pkt)
+{
+ int times=1;
/* MTU */
/* if the packet is incosistent with the MTU of the line just drop it */
- if (min_wirevalue(markov_current,MTU,dir) > 0 && size >
min_wirevalue(markov_current,MTU,dir))
+ if (min_wirevalue(markov_current,MTU,pkt->dir) > 0 && pkt->size >
min_wirevalue(markov_current,MTU,pkt->dir)) {
+ free(pkt);
return;
+ }
/* LOSS */
/* Total packet loss */
- if (min_wirevalue(markov_current,LOSS,dir) >= 100.0)
+ if (min_wirevalue(markov_current,LOSS,pkt->dir) >= 100.0) {
+ free(pkt);
return;
+ }
/* probabilistic loss */
- if (max_wirevalue(markov_current,LOSTBURST,dir) > 0) {
+ if (max_wirevalue(markov_current,LOSTBURST,pkt->dir) > 0) {
/* Gilbert model */
- double losval=compute_wirevalue(LOSS,dir)/100;
- double burstlen=compute_wirevalue(LOSTBURST,dir);
+ double losval=compute_wirevalue(LOSS,pkt->dir)/100;
+ double burstlen=compute_wirevalue(LOSTBURST,pkt->dir);
double alpha=losval / (burstlen*(1-losval));
double beta=1.0 / burstlen;
- switch (loss_status[dir]) {
+ switch (loss_status[pkt->dir]) {
case OK_BURST:
- if (drand48() < alpha)
loss_status[dir]=FAULTY_BURST;
+ if (drand48() < alpha)
loss_status[pkt->dir]=FAULTY_BURST;
break;
case FAULTY_BURST:
- if (drand48() < beta) loss_status[dir]=OK_BURST;
+ if (drand48() < beta)
loss_status[pkt->dir]=OK_BURST;
break;
}
- if (loss_status[dir] != OK_BURST)
+ if (loss_status[pkt->dir] != OK_BURST) {
+ free(pkt);
return;
+ }
} else {
- loss_status[dir] = OK_BURST;
- if (max_wirevalue(markov_current,LOSS,dir) > 0) {
+ loss_status[pkt->dir] = OK_BURST;
+ if (max_wirevalue(markov_current,LOSS,pkt->dir) > 0) {
/* standard non bursty model */
- double losval=compute_wirevalue(LOSS,dir)/100;
- if (drand48() < losval)
+ double losval=compute_wirevalue(LOSS,pkt->dir)/100;
+ if (drand48() < losval) {
+ free(pkt);
return;
+ }
}
}
/* DUP */
/* times is the number of dup packets */
- int times=1;
- if (max_wirevalue(markov_current,DDUP,dir) > 0) {
- double dupval=compute_wirevalue(DDUP,dir)/100;
+ if (max_wirevalue(markov_current,DDUP,pkt->dir) > 0) {
+ double dupval=compute_wirevalue(DDUP,pkt->dir)/100;
while (drand48() < dupval)
times++;
}
+ while (times > 0) {
+ struct wf_packet *pkt_in;
+ if (times > 1) {
+ pkt_in = malloc(sizeof(struct wf_packet));
+ memcpy(pkt_in, pkt, sizeof(struct wf_packet));
+ } else
+ pkt_in = pkt;
+ set_ingres_delay(pkt_in);
+ pkt_enqueue_in(pkt_in);
+ times--;
+ }
+
+#if 0
while (times>0) {
int banddelay=0;
@@ -657,71 +858,84 @@
}
times--;
}
+#endif
}
#define MIN(X,Y) (((X)<(Y))?(X):(Y))
-static void splitpacket(const unsigned char *buf,int size,int dir)
+static void splitpacket(struct wf_packet *pkt)
{
static unsigned char fragment[BUFSIZE][2];
static unsigned char *fragp[2];
static unsigned int rnx[2],remaining[2];
+ unsigned short size = pkt->size;
//fprintf(stderr,"%s: splitpacket rnx=%d remaining=%d
size=%d\n",progname,rnx[dir],remaining[dir],size);
- if (size==0) return;
- if (rnx[dir]>0) {
- register int amount=MIN(remaining[dir],size);
+ if (pkt->size==0) return;
+ if (rnx[pkt->dir]>0) {
+ register int amount=MIN(remaining[pkt->dir],pkt->size);
//fprintf(stderr,"%s: fragment amount %d\n",progname,amount);
- memcpy(fragp[dir],buf,amount);
- remaining[dir]-=amount;
- fragp[dir]+=amount;
- buf+=amount;
+ memcpy(fragp[pkt->dir],pkt->payload,amount);
+ remaining[pkt->dir]-=amount;
+ fragp[pkt->dir]+=amount;
size-=amount;
- if (remaining[dir]==0) {
+ if (remaining[pkt->dir]==0) {
//fprintf(stderr,"%s: delivered defrag
%d\n",progname,rnx[dir]);
- handle_packet(dir,fragment[dir],rnx[dir]+2);
- rnx[dir]=0;
+ pkt->size = rnx[pkt->dir]+2;
+ memcpy(pkt->payload, fragment[pkt->dir],
rnx[pkt->dir]+2);
+ handle_packet(pkt);
+ rnx[pkt->dir]=0;
}
}
while (size > 0) {
- rnx[dir]=(buf[0]<<8)+buf[1];
- //fprintf(stderr,"%s: packet %d size %d %x %x dir
%d\n",progname,rnx[dir],size-2,buf[0],buf[1],dir);
- if (rnx[dir]>1521) {
- printlog(LOG_WARNING,"Packet length error size %d rnx
%d",size,rnx[dir]);
- rnx[dir]=0;
+ rnx[pkt->dir]=(pkt->payload[0]<<8)+pkt->payload[1];
+ //fprintf(stderr,"%s: packet %d pkt->size %d %x %x pkt->dir
%d\n",progname,rnx[pkt->dir],pkt->size-2,pkt->payload[0],pkt->payload[1],pkt->dir);
+ if (rnx[pkt->dir]>1521) {
+ printlog(LOG_WARNING,"Packet length error pkt->size %d
rnx %d",pkt->size,rnx[pkt->dir]);
+ rnx[pkt->dir]=0;
return;
}
- if (rnx[dir]+2 > size) {
- //fprintf(stderr,"%s: begin defrag
%d\n",progname,rnx[dir]);
- fragp[dir]=fragment[dir];
- memcpy(fragp[dir],buf,size);
- remaining[dir]=rnx[dir]+2-size;
- fragp[dir]+=size;
+ if (rnx[pkt->dir]+2 > size) {
+ //fprintf(stderr,"%s: begin defrag
%d\n",progname,rnx[pkt->dir]);
+ fragp[pkt->dir]=fragment[pkt->dir];
+ memcpy(fragp[pkt->dir],pkt->payload,pkt->size);
+ remaining[pkt->dir]=rnx[pkt->dir]+2-size;
+ fragp[pkt->dir]+=size;
size=0;
} else {
- handle_packet(dir,buf,rnx[dir]+2);
- buf+=rnx[dir]+2;
- size-=rnx[dir]+2;
- rnx[dir]=0;
+ pkt->size = rnx[pkt->dir]+2;
+ handle_packet(pkt);
+ size-=rnx[pkt->dir]+2;
+ rnx[pkt->dir]=0;
}
}
}
+
+
-static void packet_in(int dir)
+static int packet_in(int dir)
{
- unsigned char buf[BUFSIZE];
+ struct wf_packet *pkt;
int n;
+
+ pkt = malloc(sizeof(struct wf_packet));
+ pkt->next = NULL;
+ pkt->dir = dir;
if(vdeplug[dir]) {
- n=vde_recv(vdeplug[dir],buf+2,BUFSIZE-2,0);
- buf[0]=n>>8;
- buf[1]=n&0xFF;
- handle_packet(dir,buf,n+2);
+ n=vde_recv(vdeplug[dir],pkt->payload + 2,BUFSIZE-2,0);
+ pkt->payload[0]=n>>8;
+ pkt->payload[1]=n&0xFF;
+ pkt->size = (unsigned short)n + 2;
+ handle_packet(pkt);
} else {
- n=read(pfd[dir].fd,buf,BUFSIZE);
- if (n == 0)
+ n = read(pfd[dir].fd,pkt->payload,BUFSIZE);
+ if (n <= 0)
exit (0);
- splitpacket(buf,n,dir);
+ pkt->size = (unsigned short)n;
+ splitpacket(pkt);
}
+ fprintf(stderr,"Packet In: %d\n",n);
+ return n;
}
static int check_open_fifos_n_plugs(struct pollfd *pfd,int *outfd,char
*vdepath[],VDECONN *vdeplug[])
@@ -1666,7 +1880,7 @@
}
}
}
- n=poll(pfd,npfd,delay);
+ n=poll(pfd,npfd,1);
if (pfd[0].revents & POLLHUP || (ndirs>1 && pfd[1].revents &
POLLHUP))
exit(0);
if (pfd[0].revents & POLLIN) {
@@ -1700,6 +1914,8 @@
exit(0);*/
}
markov_try();
- packet_dequeue();
+ //packet_dequeue();
+ process_queue_out();
+ process_queue_in();
}
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Xperia(TM) PLAY
It's a major breakthrough. An authentic gaming
smartphone on the nation's most reliable network.
And it wants your games.
http://p.sf.net/sfu/verizon-sfdev
_______________________________________________
vde-users mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/vde-users