Revision: 490 http://vde.svn.sourceforge.net/vde/?rev=490&view=rev Author: danielel Date: 2011-04-09 14:26:09 +0000 (Sat, 09 Apr 2011)
Log Message: ----------- * Mark loss packets to be discarded in the output queue * Introduce -c (drop tail) and -r (Random Early Detection) for ingres queue (linked to delay) * Removed unused code * RED and Drop Tail debug messages still in place. Modified Paths: -------------- branches/danielinux-wirefilter2/src/wirefilter.c Modified: branches/danielinux-wirefilter2/src/wirefilter.c =================================================================== --- branches/danielinux-wirefilter2/src/wirefilter.c 2011-04-08 11:43:36 UTC (rev 489) +++ branches/danielinux-wirefilter2/src/wirefilter.c 2011-04-09 14:26:09 UTC (rev 490) @@ -38,7 +38,10 @@ #include <vde.h> #include <vdecommon.h> #include <libvdeplug.h> +#include <stdint.h> +#define min(a,b) a<b?a:b + #if defined(VDE_DARWIN) || defined(VDE_FREEBSD) # if defined HAVE_SYSLIMITS_H # include <syslimits.h> @@ -117,9 +120,8 @@ char loss_status[2]; /* Gilbert model Markov chain status */ struct timeval nextband[2]; struct timeval nextspeed[2]; -int nofifo; +int nofifo = 0; int ndirs; //1 mono directional, 2 bi directional filter (always 2 with -v) -int delay_bufsize[2]; //total size of delayed packets char *vdepath[2]; //path of the directly connected switched (via vde_plug) VDECONN *vdeplug[2]; //vde_plug connections (if NULL stdin/stdout) int daemonize; // daemon mode @@ -167,17 +169,76 @@ /*** WF 2 ***/ +struct red_parms { + int enabled; + uint32_t min; + uint32_t max; + double P; + uint32_t limit; +}; + +static struct red_parms red[2]; + +void red_set_parms(struct red_parms *p, uint32_t min, uint32_t max, double P, uint32_t limit) +{ + p->min = min; + p->max = max; + p->P = P; + p->limit = limit; + p->enabled=1; +} + +int parse_red(char *arg, struct red_parms *p){ + unsigned rmin=0,rmax=0,limit=0; + double probability=0; + int direction; + if (strncmp(arg,"LR",2)==0){ + direction=LR; + arg+=2; + } else if(strncmp(arg,"RL",2)==0){ + direction=RL; + arg+=2; + } else direction = 2; + + if ( + sscanf(arg,"%lu,%lu,%lf,%lu",&rmin,&rmax,&probability,&limit)<=0 + || (!rmin || !rmax || !limit || probability <= 0) + ){ + fprintf(stderr,"Failed to set RED parameters. Red disabled.\n"); + return -1; + } + fprintf(stderr,"red min=%lu, max=%lu, prob=%lf, limit=%lu\n", rmin,rmax,probability,limit); + switch (direction){ + case LR: + red_set_parms(&p[LR],rmin,rmax,probability,limit); + return 0; + case RL: + red_set_parms(&p[LR],rmin,rmax,probability,limit); + red_set_parms(&p[RL],rmin,rmax,probability,limit); + return 0; + case 2: + red_set_parms(&p[LR],rmin,rmax,probability,limit); + red_set_parms(&p[RL],rmin,rmax,probability,limit); + return 0; + } + + return -1; +} + +#define WFP_LOSS 0x01 struct wf_packet { struct wf_packet *next; unsigned char payload[BUFSIZE]; unsigned short size; unsigned long long dequeue_time; int dir; + unsigned char flags; }; static unsigned long outqueue_delay; -static struct wf_packet *wf_queue_in[2]; -static struct wf_packet *wf_queue_out[2]; +static struct wf_packet *wf_queue_in[2], *wf_queue_in_tail[2]; +static struct wf_packet *wf_queue_out[2], *wf_queue_out_tail[2]; +static unsigned long queue_size_in[2], queue_size_out[2]; int queue_size(struct wf_packet *p) { int n = 0; @@ -190,10 +251,11 @@ static struct wf_packet *_pkt_enqueue(struct wf_packet *q, struct wf_packet *pkt) { - if (!q) + if (!q) { return pkt; - - if (pkt->dequeue_time < q->dequeue_time) { + } + if ((nofifo) && (pkt->dequeue_time < q->dequeue_time)) { + fprintf(stderr,"reordering...\n"); pkt->next = q; return pkt; } @@ -205,17 +267,29 @@ { struct wf_packet *q = wf_queue_in[pkt->dir]; pkt->next = NULL; + queue_size_in[pkt->dir] += pkt->size; + if (!q) { + wf_queue_in[pkt->dir] = pkt; + wf_queue_in_tail[pkt->dir] = pkt; + return; + } + if (!nofifo && wf_queue_in_tail[pkt->dir]) { + wf_queue_in_tail[pkt->dir]->next = pkt; + wf_queue_in_tail[pkt->dir] = pkt; + return; + } wf_queue_in[pkt->dir] = _pkt_enqueue(q, pkt); - fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_in[pkt->dir])); + //fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_in[pkt->dir])); } static void pkt_enqueue_out(struct wf_packet *pkt) { struct wf_packet *q = wf_queue_out[pkt->dir]; + queue_size_out[pkt->dir] += pkt->size; pkt->next = NULL; wf_queue_out[pkt->dir] = _pkt_enqueue(q, pkt); - fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir])); + //fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir])); } static int is_time_to_dequeue(int dir) @@ -241,8 +315,9 @@ continue; bandval = compute_wirevalue(BAND,i); if (bandval == 0) { - writepacket(i, pkt->payload, pkt->size); + writepacket(pkt); wf_queue_out[i] = pkt->next; + queue_size_out[pkt->dir] -= pkt->size; count[i] += pkt->size; last_out[i] = gettimeofdayms(); free(pkt); @@ -250,8 +325,9 @@ now = gettimeofdayms(); pkt->dequeue_time = (unsigned long long) ((double)last_out[i] + (((double)(pkt->size + count[i])*1000) / bandval)); if (now >= pkt->dequeue_time) { - writepacket(i, pkt->payload, pkt->size); + writepacket(pkt); wf_queue_out[i] = pkt->next; + queue_size_out[pkt->dir] -= pkt->size; count[i] += pkt->size; last_out[i] = now; free(pkt); @@ -259,11 +335,12 @@ } } } while (count[0] > old_count[0] || count[1] > old_count[1]); - + /* if (count[0] > 0) fprintf(stderr,">>------------> OUT process queue: %d bytes transferred\n", count[0]); if (count[1] > 0) fprintf(stderr,"<------------<< OUT process queue: %d bytes transferred\n", count[1]); + */ return count[0] + count[1]; } @@ -275,12 +352,15 @@ if(is_time_to_dequeue(i)) { p = wf_queue_in[i]; wf_queue_in[i] = p->next; + queue_size_in[p->dir] -= p->size; pkt_enqueue_out(p); count++; } } + /* if (count > 0) fprintf(stderr,"process queue: %d packets transferred\n", count); + */ return count; } @@ -562,164 +642,69 @@ return write(outfd[dir],buf,size); } -int writepacket(int dir,const unsigned char *buf,int size) +int writepacket(struct wf_packet *pkt) { + if (pkt->flags & WFP_LOSS) { + //fprintf(stderr, "PACKET LOSS ********************\n"); + return 0; + } /* NOISE */ - if (max_wirevalue(markov_current,NOISE,dir) > 0) { - double noiseval=compute_wirevalue(NOISE,dir); + if (max_wirevalue(markov_current,NOISE,pkt->dir) > 0) { + double noiseval=compute_wirevalue(NOISE,pkt->dir); int nobit=0; - while ((drand48()*8*MEGA) < (size-2)*8*noiseval) + while ((drand48()*8*MEGA) < (pkt->size-2)*8*noiseval) nobit++; if (nobit>0) { unsigned char noisedpacket[BUFSIZE]; - memcpy(noisedpacket,buf,size); + memcpy(noisedpacket,pkt->payload,pkt->size); while(nobit>0) { - int flippedbit=(drand48()*size*8); + int flippedbit=(drand48()*pkt->size*8); noisedpacket[(flippedbit >> 3) + 2] ^= 1<<(flippedbit & 0x7); nobit--; } - return outpacket(dir,noisedpacket,size); + return outpacket(pkt->dir,noisedpacket,pkt->size); } else - return outpacket(dir,buf,size); + return outpacket(pkt->dir,pkt->payload,pkt->size); } else - return outpacket(dir,buf,size); + return outpacket(pkt->dir,pkt->payload,pkt->size); } -/* packet queues are priority queues implemented on a heap. - * enqueue time = dequeue time = O(log n) max&mean - */ -/* the delay is evaluated in milliseconds, several packets can be - scheduled at the same "when" time. Counter preserve the fifoness. */ - -static void packet_dequeue() +unsigned long time_in_queue(struct wf_packet *pkt) { - struct timeval v; - gettimeofday(&v,NULL); - unsigned long long now=(unsigned long long)v.tv_sec*1000+v.tv_usec/1000; - /* the next packet (min time, min counter) is in the root of - the packetqueue heap */ - while (npq>0 && pqh[1]->when <= now) { - struct packpq *old=pqh[npq--]; - int k=1; - delay_bufsize[pqh[1]->dir] -= pqh[1]->size; - writepacket(pqh[1]->dir,pqh[1]->buf,pqh[1]->size); - free(pqh[1]->buf); - free(pqh[1]); - /* rebuild the heap */ - while (k<= npq>>1) - { - int j= k<<1; - /* choose the min between pqh[2k] and pqh[2k+1] */ - if (j<npq && - (pqh[j]->when > pqh[j+1]->when || - (pqh[j]->when == pqh[j+1]->when && - pqh[j]->counter > pqh[j+1]->counter) - ) - ) j++; - /* if old must be put here, okay else move the min up and - continue the rebuilding phase */ - if (old->when < pqh[j]->when || - (old->when == pqh[j]->when && - old->counter < pqh[j]->counter) - ) - break; - else { - pqh[k]=pqh[j];k=j; - } - } - pqh[k]=old; - } -} - -static void packet_enqueue(int dir,const unsigned char *buf,int size,int delms) -{ - struct timeval v; - - struct packpq *new=malloc(sizeof(struct packpq)); - if (new==NULL) { - printlog(LOG_WARNING,"malloc elem %s",strerror(errno)); - exit (1); - } - gettimeofday(&v,NULL); - new->when= ((unsigned long long)v.tv_sec * 1000 + v.tv_usec/1000) + delms; - if (new->when > maxwhen) { - maxwhen=new->when; - counter=0; - } - if (!nofifo && new->when <= maxwhen) { - new->when=maxwhen; - counter++; - } - new->counter=counter; - new->dir=dir; - new->buf=malloc(size); - if (new->buf==NULL) { - printlog(LOG_WARNING,"malloc elem buf %s",strerror(errno)); - exit (1); - } - memcpy(new->buf,buf,size); - new->size=size; - delay_bufsize[dir]+=size; - if (pqh==NULL) { - pqh=malloc(PQCHUNK*sizeof(struct packpq *)); - if (pqh==NULL) { - printlog(LOG_WARNING,"malloc %s",strerror(errno)); - exit (1); - } - pqh[0]=&sentinel; maxpq=PQCHUNK; - } - if (npq >= maxpq) { - pqh=realloc(pqh,(maxpq=maxpq+PQCHUNK) * sizeof(struct packpq *)); - if (pqh==NULL) { - printlog(LOG_WARNING,"malloc %s",strerror(errno)); - exit (1); - } - } - { - int k=++npq; - /* add the new element to the heap */ - while (new->when < pqh[k>>1]->when || - (new->when == pqh[k>>1]->when && new->counter < pqh[k>>1]->counter)) { - pqh[k]=pqh[k>>1]; - k >>= 1; - } - pqh[k]=new; - } -} - -unsigned long time_in_outqueue(dir) -{ unsigned long bytes_in_queue = 0; - struct wf_packet *pkt = wf_queue_out[dir]; - double bw_val = max_wirevalue(markov_current,BAND,dir); + double bw_val; unsigned long timetogo = 0; + if (!pkt) + return 0U; + bw_val = max_wirevalue(markov_current,BAND,pkt->dir); + if (!bw_val) { - return 0; + return 0U; } while(pkt) { bytes_in_queue += pkt->size; pkt = pkt->next; } timetogo = 1000 * (bytes_in_queue / bw_val); +/* fprintf(stderr,"Time that will be spent in out queue: %lu ms (queue size: %lu B, speed: %.2f B/s)\n", timetogo, bytes_in_queue, bw_val); +*/ return timetogo; } void set_ingres_delay(struct wf_packet *pkt) { - int banddelay = - pkt->dequeue_time = 0; - if (banddelay >= 0) { - if (banddelay > 0 || max_wirevalue(markov_current,DELAY,pkt->dir) > 0) { - double delval=compute_wirevalue(DELAY,pkt->dir); - delval=(delval >= 0)?delval+banddelay:banddelay; - if (delval > 0) { - struct timeval tv; - unsigned long long now = gettimeofdayms(); - pkt->dequeue_time = now + delval - time_in_outqueue(pkt->dir); - } + pkt->dequeue_time = 0U; + if (max_wirevalue(markov_current,DELAY,pkt->dir) > 0) { + double delval=compute_wirevalue(DELAY,pkt->dir); + unsigned long banddelay = time_in_queue(wf_queue_in[pkt->dir]); + delval=(delval >= 0)?delval+banddelay:banddelay; + if (delval > 0) { + struct timeval tv; + unsigned long long now = gettimeofdayms(); + pkt->dequeue_time = now + delval - banddelay; } } } @@ -727,6 +712,9 @@ void handle_packet(struct wf_packet *pkt) { int times=1; + int chanbuf; + pkt->flags = 0; + /* MTU */ /* if the packet is incosistent with the MTU of the line just drop it */ if (min_wirevalue(markov_current,MTU,pkt->dir) > 0 && pkt->size > min_wirevalue(markov_current,MTU,pkt->dir)) { @@ -737,8 +725,7 @@ /* LOSS */ /* Total packet loss */ if (min_wirevalue(markov_current,LOSS,pkt->dir) >= 100.0) { - free(pkt); - return; + pkt->flags |= WFP_LOSS; } /* probabilistic loss */ if (max_wirevalue(markov_current,LOSTBURST,pkt->dir) > 0) { @@ -756,8 +743,7 @@ break; } if (loss_status[pkt->dir] != OK_BURST) { - free(pkt); - return; + pkt->flags |= WFP_LOSS; } } else { loss_status[pkt->dir] = OK_BURST; @@ -765,8 +751,7 @@ /* standard non bursty model */ double losval=compute_wirevalue(LOSS,pkt->dir)/100; if (drand48() < losval) { - free(pkt); - return; + pkt->flags |= WFP_LOSS; } } } @@ -786,94 +771,57 @@ } else pkt_in = pkt; set_ingres_delay(pkt_in); - pkt_enqueue_in(pkt_in); - times--; - } - -#if 0 - while (times>0) { - int banddelay=0; - - /* CHANBUFSIZE */ - /* when bandwidth is limited, packets exceeding channel bufsize are discarded */ - if (max_wirevalue(markov_current,CHANBUFSIZE,dir) > 0) { - double capval=compute_wirevalue(CHANBUFSIZE,dir); - if ((delay_bufsize[dir]+size) > capval) - return; - } - - /* SPEED */ - /* speed limit, if packets arrive too fast, delay the sender */ - if (max_wirevalue(markov_current,SPEED,dir) > 0) { - double speedval=compute_wirevalue(SPEED,dir); - if (speedval<=0) return; - if (speedval>0) { - unsigned int commtime=((unsigned)size)*1000000/((unsigned int)speedval); - struct timeval tv; - gettimeofday(&tv,NULL); - banddelay=commtime/1000; - if (timercmp(&tv,&nextspeed[dir], > )) - nextspeed[dir]=tv; - nextspeed[dir].tv_usec += commtime; - nextspeed[dir].tv_sec += nextspeed[dir].tv_usec / 1000000; - nextspeed[dir].tv_usec %= 1000000; + /* RED */ + double red_probability; + if (red[pkt_in->dir].enabled){ + if (red[pkt_in->dir].min > queue_size_in[pkt_in->dir]) { + goto RED_PASS; + } else if (red[pkt_in->dir].max > queue_size_in[pkt_in->dir]) { + red_probability = red[pkt_in->dir].P * + ((double)queue_size_in[pkt_in->dir] - (double)red[pkt_in->dir].min) / + ((double)red[pkt_in->dir].max - (double)red[pkt_in->dir].min); + } else if (queue_size_in[pkt_in->dir] < red[pkt_in->dir].limit) { + red_probability = red[pkt_in->dir].P; + } else { + fprintf(stderr,"RED: Hard limit drop.\n"); + free(pkt_in); + times--; + continue; } + if (drand48() < red_probability) { + fprintf(stderr,"RED: Probability drop. (red probability= %lf, queue size= %lu\n", red_probability, queue_size_in[pkt_in->dir]); + free(pkt_in); + times--; + continue; + } + + } else { + int drop_tail = max_wirevalue(markov_current, CHANBUFSIZE, pkt_in->dir); + if (drop_tail > 0 && drop_tail < queue_size_in[pkt_in->dir]) { + fprintf(stderr, "Drop Tail. Queue size: %lu, limit: %lu\n", queue_size_in[pkt_in->dir], drop_tail); + free(pkt_in); + times--; + continue; + } } - - /* BANDWIDTH */ - /* band, when band overflows, delay just the delivery */ - if (max_wirevalue(markov_current,BAND,dir) > 0) { - double bandval=compute_wirevalue(BAND,dir); - if (bandval<=0) return; - if (bandval >0) { - unsigned int commtime=((unsigned)size)*1000000/((unsigned int)bandval); - struct timeval tv; - gettimeofday(&tv,NULL); - if (timercmp(&tv,&nextband[dir], > )) { - nextband[dir]=tv; - banddelay=commtime/1000; - } else { - timersub(&nextband[dir],&tv,&tv); - banddelay=tv.tv_sec*1000 + (tv.tv_usec + commtime)/1000; - } - nextband[dir].tv_usec += commtime; - nextband[dir].tv_sec += nextband[dir].tv_usec / 1000000; - nextband[dir].tv_usec %= 1000000; - } else - banddelay=-1; - } - - /* DELAY */ - /* line delay */ - if (banddelay >= 0) { - if (banddelay > 0 || max_wirevalue(markov_current,DELAY,dir) > 0) { - double delval=compute_wirevalue(DELAY,dir); - delval=(delval >= 0)?delval+banddelay:banddelay; - if (delval > 0) { - packet_enqueue(dir,buf,size,(int) delval); - } else - writepacket(dir,buf,size); - } else - writepacket(dir,buf,size); - } + RED_PASS: + pkt_enqueue_in(pkt_in); times--; } -#endif } -#define MIN(X,Y) (((X)<(Y))?(X):(Y)) - static void splitpacket(struct wf_packet *pkt) { static unsigned char fragment[BUFSIZE][2]; static unsigned char *fragp[2]; static unsigned int rnx[2],remaining[2]; unsigned short size = pkt->size; + memset(red, 0, 2* sizeof(struct red_parms)); //fprintf(stderr,"%s: splitpacket rnx=%d remaining=%d size=%d\n",progname,rnx[dir],remaining[dir],size); if (pkt->size==0) return; if (rnx[pkt->dir]>0) { - register int amount=MIN(remaining[pkt->dir],pkt->size); + register int amount=min(remaining[pkt->dir],pkt->size); //fprintf(stderr,"%s: fragment amount %d\n",progname,amount); memcpy(fragp[pkt->dir],pkt->payload,amount); remaining[pkt->dir]-=amount; @@ -934,7 +882,7 @@ pkt->size = (unsigned short)n; splitpacket(pkt); } - fprintf(stderr,"Packet In: %d\n",n); + //fprintf(stderr,"Packet In: %d\n",n); return n; } @@ -952,7 +900,7 @@ alternate_stdout=atoi(env_out); if (vdepath[0]) { // -v selected if (strcmp(vdepath[0],"-") != 0) { - if((vdeplug[LR]=vde_open(vdepath[0],"vde_crosscable",NULL))==NULL){ + if((vdeplug[LR]=vde_open(vdepath[0],"vde_wirefilter",NULL))==NULL){ fprintf(stderr,"vdeplug %s: %s\n",vdepath[0],strerror(errno)); return -1; } @@ -960,7 +908,7 @@ pfd[0].events=POLLIN | POLLHUP; } if (strcmp(vdepath[1],"-") != 0) { - if((vdeplug[RL]=vde_open(vdepath[1],"vde_crosscable",NULL))==NULL){ + if((vdeplug[RL]=vde_open(vdepath[1],"vde_wirefilter",NULL))==NULL){ fprintf(stderr,"vdeplug %s: %s\n",vdepath[1],strerror(errno)); return -1; } @@ -1368,6 +1316,7 @@ printoutc(fd, "mtu set channel MTU (bytes)"); printoutc(fd, "chanbufsize set channel buffer size (bytes)"); printoutc(fd, "fifo set channel fifoness"); + printoutc(fd, "RED set channel random early detection algorithm min,max,probability,limit,burst,avpkt,bandwidth"); printoutc(fd, "shutdown shut the channel down"); printoutc(fd, "logout log out from this mgmt session"); printoutc(fd, "markov-numnodes n markov mode: set number of states"); @@ -1427,7 +1376,6 @@ printoutc(fd, "Cap. L->R %g+%g%c R->L %g+%g%c", WIREVALUE_FIELDS(node,CHANBUFSIZE,LR), WIREVALUE_FIELDS(node,CHANBUFSIZE,RL)); - printoutc(fd, "Current Delay Queue size: L->R %d R->L %d ",delay_bufsize[LR],delay_bufsize[RL]); } else { printoutc(fd, "Loss %g+%g%c", WIREVALUE_FIELDS(node,LOSS,0)); @@ -1446,7 +1394,6 @@ printoutc(fd, "MTU %g", min_wirevalue(node,MTU,0)); printoutc(fd, "Cap. %g+%g%c", WIREVALUE_FIELDS(node,CHANBUFSIZE,0)); - printoutc(fd, "Current Delay Queue size: %d",delay_bufsize[0]); } printoutc(fd,"Fifoness %s",(nofifo == 0)?"TRUE":"FALSE"); printoutc(fd,"Waiting packets in delay queues %d",npq); @@ -1639,6 +1586,7 @@ "\t--pidfile pidfile\n" "\t--blink blinksocket\n" "\t--blinkid blink_id_string\n" + "\t--RED min,max,probability,limit,burst,avpkt,bandwidth\n" ,progname); exit (1); } @@ -1681,7 +1629,7 @@ while(1) { int c; - c = GETOPT_LONG (argc, argv, "hl:n:d:M:D:m:b:s:c:v:L:f:", + c = GETOPT_LONG (argc, argv, "hl:n:d:M:D:m:b:s:c:v:L:f:r:", long_options, &option_index); if (c<0) break; @@ -1725,6 +1673,9 @@ case 'N': nofifo=1; break; + case 'r': + parse_red(optarg,red); + break; case 'v': { char *colon; @@ -1914,7 +1865,6 @@ exit(0);*/ } markov_try(); - //packet_dequeue(); process_queue_out(); process_queue_in(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Xperia(TM) PLAY It's a major breakthrough. An authentic gaming smartphone on the nation's most reliable network. And it wants your games. http://p.sf.net/sfu/verizon-sfdev _______________________________________________ vde-users mailing list vde-users@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/vde-users