Naturally I forgot to attach the code, as I was rushing to get to lunch.
R.harper
p.s why do my messages end up as a reply to someone else's message in the archive?
_________________________________________________________________ Log på MSN Messenger direkte fra nettet http://webmessenger.msn.com/
/* Flow based Qdisc scheduler */ #include <linux/config.h> #include <linux/module.h> #include <linux/types.h> #include <linux/kernel.h> #include <linux/string.h> #include <linux/mm.h> #include <linux/socket.h> #include <linux/sockios.h> #include <linux/in.h> #include <linux/errno.h> #include <linux/interrupt.h> #include <linux/if_ether.h> #include <linux/inet.h> #include <linux/netdevice.h> #include <linux/etherdevice.h> #include <linux/notifier.h> #include <net/ip.h> #include <net/route.h> #include <linux/skbuff.h> #include <net/sock.h> #include <net/pkt_sched.h>
#include <linux/jhash.h> #include <linux/netfilter_ipv4/ip_conntrack.h> #include <linux/netfilter_ipv4/ip_conntrack_protocol.h> #include <linux/netfilter_ipv4/ip_conntrack_tcp.h> #include <linux/netfilter_ipv4/ip_tables.h> #define LATENCY 100000 #define LIMIT 50 #define HASH_SIZE 20 #define FLOW_LIFETIME (10*HZ) /* Protects conntrack->proto.tcp*/ //static DECLARE_RWLOCK(tcp_lock); struct flow_sched_data { u32 latency; u32 limit; u32 qlen; struct sk_buff_head qs[HASH_SIZE]; u32 is_used[HASH_SIZE]; psched_time_t last_pkt_sent[HASH_SIZE]; psched_time_t next_send_time[HASH_SIZE]; psched_time_t interval[HASH_SIZE]; struct timer_list remove_timer[HASH_SIZE]; struct timer_list timer; struct sk_buff_head fasttrack; }; static __inline__ u32 get_hash(struct sk_buff *skb) { if (skb->protocol == __constant_htons(ETH_P_IP) ){ struct iphdr *iph = skb->nh.iph; if (iph->protocol == 6){ struct tcphdr *tcph = (void *)iph + (iph->ihl*4); return jhash_3words(iph->saddr^iph->protocol, iph->daddr, (tcph->dest << 16 | tcph->source), 0x543298ff); } else{ return jhash_3words(iph->saddr, iph->daddr, iph->protocol, 0x543298ff); } } return 0; } static __inline__ short flow_hash(struct sk_buff *skb, struct Qdisc *sch) { int i = 0; u32 hash; short index; struct ip_conntrack *ct; enum ip_conntrack_info ctinfo; struct flow_sched_data *q = qdisc_priv(sch); ct = ip_conntrack_get(skb, &ctinfo); //if (ct->proto.tcp.state == TCP_CONNTRACK_ESTABLISHED) hash = get_hash(skb); index = hash % HASH_SIZE; if (q->is_used[index] == hash){ //printk("Correct flow found, hash: %u, index: %i \n", hash, index); //Found correct flow return index; } else if (q->is_used[index] == 0){ psched_time_t next_send; PSCHED_GET_TIME(next_send); //bucket unused, lets use this one printk("New flow, hash: %u, index: %i \n", hash, index); q->is_used[index] = hash; //seting the dequeue time! //READ_LOCK(&tcp_lock); //q->next_send_time[index] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate)); //READ_UNLOCK(&tcp_lock); q->next_send_time[index] = PSCHED_TADD(next_send, q->interval[index]); return index; } else{ //Must search the whole table to see if this is an //established connection, for(i = 0; i < HASH_SIZE; i++){ if (q->is_used[(index+i)%HASH_SIZE] == hash){ //found the right slot in the table; return (index+i)%HASH_SIZE; } } //Ok didn't find an established connection //This *must* be a new connection for(i = 0; i < HASH_SIZE; i++){ if (q->is_used[(index+i)%HASH_SIZE] == 0){ psched_time_t next_send; PSCHED_GET_TIME(next_send); //found unused slot in the hash table. q->is_used[(index+i)%HASH_SIZE] = hash; //READ_LOCK(&tcp_lock); //q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate)); //READ_UNLOCK(&tcp_lock); q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, q->interval[(index+i)%HASH_SIZE]); return (index+i)%HASH_SIZE; } } //Nothing found, strange! //Must be an error printk(KERN_WARNING "No hash bucket found!\n"); return -1; } } static __inline__ int flow_is_valid(struct sk_buff *skb){ struct iphdr *iph = skb->nh.iph; enum ip_conntrack_info ctinfo; struct tcphdr *tcph = (void *)iph + (iph->ihl*4); if (skb->protocol != __constant_htons(ETH_P_IP) ) return 0; if (iph->protocol != IPPROTO_TCP) return 0; ip_conntrack_get(skb, &ctinfo); if (!tcph->ack) return 0; if (ctinfo != IP_CT_ESTABLISHED) return 0; return 1; } static int flow_enqueue(struct sk_buff *skb, struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); short hash; if (flow_is_valid(skb) == 1) hash = flow_hash(skb, sch); else hash = 0; if (!skb->nfmark) goto drop; if (q->qlen && sch->q.qlen >= q->qlen){ printk("Dropping packet, queue full\n"); goto drop; } if (hash == -1) goto drop; //printk("flow_enqueue hash %u skb=%p @%lu\n", hash, skb, jiffies); //printk("qs len pre: %i \n", q->qs[hash].qlen); __skb_queue_tail(&q->qs[hash], skb); //printk("qs len post: %i \n", q->qs[hash].qlen); q->remove_timer[hash].expires = jiffies + FLOW_LIFETIME; add_timer(&q->remove_timer[hash]); if (++sch->q.qlen < q->limit-1){ sch->stats.bytes += skb->len; sch->stats.packets++; return NET_XMIT_SUCCESS; } drop: sch->stats.drops++; kfree_skb(skb); return NET_XMIT_CN; } static unsigned int flow_drop(struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); unsigned int len; struct sk_buff *skb; int i = 0; //just delete from the first flow found while (q->qs[i].qlen == 0 && i < HASH_SIZE) i++; if (i == HASH_SIZE) //nothing found to delete return 0; else{ skb = q->qs[i].prev; len = skb->len; __skb_unlink(skb, &q->qs[i]); kfree_skb(skb); sch->q.qlen--; sch->stats.drops++; return len; } } static int flow_requeue(struct sk_buff *skb, struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); short hash = flow_hash(skb, sch); __skb_queue_head(&q->qs[hash], skb); if (++sch->q.qlen < q->limit-1) return NET_XMIT_SUCCESS; flow_drop(sch); return NET_XMIT_CN; } static __inline__ struct sk_buff * is_time_to_send(struct Qdisc *sch, psched_time_t *now, struct flow_sched_data *q, int *i, long *min) { long diff; for (; *i < HASH_SIZE; (*i)++){ diff = PSCHED_TDIFF(q->next_send_time[*i], *now); if (q->qs[*i].qlen >0){ //printk("FOUND used queue %i \n", *i); //printk("DIFF: %li\n", diff); if (diff <= 0) //printk("Send NOW %i\n", *i); return __skb_dequeue(&q->qs[*i]); else{ if (diff > 0 && (diff < *min || *min ==-1)){ //printk("Setting min %li for i=%i \n", diff, *i); *min = diff; } } } } return NULL; } static struct sk_buff *flow_dequeue(struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); struct sk_buff *skb; struct ip_conntrack *ct; enum ip_conntrack_info ctinfo; int i = 0; long min = -1; psched_time_t now; PSCHED_GET_TIME(now); while ( (skb = is_time_to_send(sch, &now, q, &i, &min) ) != NULL){ //Get the conntrack info ct = ip_conntrack_get(skb, &ctinfo); //Decrease Qdisc counter sch->q.qlen--; //Turn flag OFF sch->flags &= ~TCQ_F_THROTTLED; //printk("flow_dequeue hash=%i skb=%p @%lu\n",i, skb, jiffies); //printk("RATE: %u\n", ct->proto.tcp.rate); //Update next send time //q->next_send_time[i] = PSCHED_TADD(now, PSCHED_JIFFIE2US(ct->proto.tcp.rate)); q->next_send_time[i] = PSCHED_TADD(now, q->interval[i]); q->last_pkt_sent[i] = now; //printk(" Next send time is %lu\n", q->next_send_time[i]); return skb; } if (skb == NULL){ long delay = PSCHED_US2JIFFIE(min); if (min == -1){ //noting to send printk(" Noting to send\n"); return NULL; } if (delay <= 0) delay = 1; mod_timer(&q->timer, jiffies+delay); //printk(" Not time, sch->q.qlen= %i and limit %i\n", sch->q.qlen, q->limit); //printk(" delaying jiffies = %li\n", delay); //Turn flag ON sch->flags |= TCQ_F_THROTTLED; } return NULL; } static void flow_reset(struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); struct sk_buff *skb; while((skb = flow_dequeue(sch)) != NULL) kfree_skb(skb); sch->q.qlen = 0; sch->flags &= ~TCQ_F_THROTTLED; del_timer(&q->timer); } static void flow_timer(unsigned long arg) { struct Qdisc *sch = (struct Qdisc *)arg; sch->flags &= ~TCQ_F_THROTTLED; netif_schedule(sch->dev); } static __inline__ void flow_timer_del(unsigned long p) { struct flow_sched_data *q = (struct flow_sched_data *) p; psched_time_t now; psched_tdiff_t diff; int i = 0; printk("TIMER CALLED\n"); printk("sizeof psched_time_t %i\n", sizeof(psched_time_t)); //Some flow must be expired since we get called, //must find it PSCHED_GET_TIME(now); for(i = 0; i < HASH_SIZE; i++) if (q->is_used[i] != 0 || i == 0){ // (now - last_sent) > FLOW_LIFETIME //printk("Now %llu, last sent %llu \n", now, q->last_pkt_sent[i]); //printk("PSCEHED_TDIFF %li, %d\n", PSCHED_TDIFF(now, q->last_pkt_sent[i]), FLOW_LIFETIME); //printk("us2jiffies %lu\n", PSCHED_US2JIFFIE(PSCHED_TDIFF(now, q->last_pkt_sent[i]))); diff = PSCHED_TDIFF(now, q->last_pkt_sent[i]); if ( PSCHED_US2JIFFIE(diff) >= FLOW_LIFETIME){ printk("Deleteing flow %i \n", i); q->is_used[i] = 0; } } } static int flow_init(struct Qdisc *sch, struct rtattr *opt) { struct flow_sched_data *q = qdisc_priv(sch); int i; psched_time_t now; printk("----Initialising sch_flow\n----"); PSCHED_GET_TIME(now); for (i=0; i < HASH_SIZE; i++){ skb_queue_head_init(&q->qs[i]); q->is_used[i] = 0; q->last_pkt_sent[i] = now; //constant delay of 200ms q->interval[i] = PSCHED_JIFFIE2US(HZ/5); init_timer(&q->remove_timer[i]); q->remove_timer[i].function = flow_timer_del; q->remove_timer[i].data = (unsigned long) q; } init_timer(&q->timer); q->timer.function = flow_timer; q->timer.data = (unsigned long) sch; return 0; } static void flow_destroy(struct Qdisc *sch) { struct flow_sched_data *q = qdisc_priv(sch); int i; printk("----Destroy called\n"); for (i = 0; i < HASH_SIZE; i++) del_timer(&q->remove_timer[i]); del_timer(&q->timer); } static int flow_dump(struct Qdisc *sch, struct sk_buff *skb) { return skb->len; } static struct Qdisc_ops flow_qdisc_ops = { .id = "flow", .priv_size = sizeof(struct flow_sched_data), .enqueue = flow_enqueue, .dequeue = flow_dequeue, .requeue = flow_requeue, .drop = flow_drop, .init = flow_init, .reset = flow_reset, .destroy = flow_destroy, .change = flow_init, .dump = flow_dump, .owner = THIS_MODULE, }; static int __init flow_module_init(void) { return register_qdisc(&flow_qdisc_ops); } static void __exit flow_module_exit(void) { unregister_qdisc(&flow_qdisc_ops); } module_init(flow_module_init) module_exit(flow_module_exit) MODULE_LICENSE("GPL");
_______________________________________________ LARTC mailing list LARTC@mailman.ds9a.nl http://mailman.ds9a.nl/cgi-bin/mailman/listinfo/lartc