Naturally I forgot to attach the code, as I was rushing to get to lunch.

R.harper

p.s why do my messages end up as a reply to someone else's message in the archive?

_________________________________________________________________
Log på MSN Messenger direkte fra nettet  http://webmessenger.msn.com/
/*
  Flow based Qdisc scheduler 
*/
#include <linux/config.h>
#include <linux/module.h>
#include <linux/types.h>
#include <linux/kernel.h>
#include <linux/string.h>
#include <linux/mm.h>
#include <linux/socket.h>
#include <linux/sockios.h>
#include <linux/in.h>
#include <linux/errno.h>
#include <linux/interrupt.h>
#include <linux/if_ether.h>
#include <linux/inet.h>
#include <linux/netdevice.h>
#include <linux/etherdevice.h>
#include <linux/notifier.h>
#include <net/ip.h>
#include <net/route.h>
#include <linux/skbuff.h>
#include <net/sock.h>
#include <net/pkt_sched.h>

#include <linux/jhash.h>
#include <linux/netfilter_ipv4/ip_conntrack.h>
#include <linux/netfilter_ipv4/ip_conntrack_protocol.h>
#include <linux/netfilter_ipv4/ip_conntrack_tcp.h>
#include <linux/netfilter_ipv4/ip_tables.h>


#define LATENCY 100000
#define LIMIT 50
#define HASH_SIZE 20
#define FLOW_LIFETIME (10*HZ)


/* Protects conntrack->proto.tcp*/
//static DECLARE_RWLOCK(tcp_lock);



struct flow_sched_data {
  u32     latency;
  u32     limit;
  u32     qlen;

  struct sk_buff_head qs[HASH_SIZE];
  u32 is_used[HASH_SIZE];
  psched_time_t last_pkt_sent[HASH_SIZE];
  psched_time_t next_send_time[HASH_SIZE];
  psched_time_t interval[HASH_SIZE];
  struct timer_list remove_timer[HASH_SIZE];
  struct timer_list timer;
  struct sk_buff_head fasttrack;
};


static __inline__ u32 get_hash(struct sk_buff *skb)
{
  if (skb->protocol == __constant_htons(ETH_P_IP) ){
    struct iphdr *iph = skb->nh.iph;
    if (iph->protocol == 6){
      struct tcphdr *tcph = (void *)iph + (iph->ihl*4);
      return jhash_3words(iph->saddr^iph->protocol, iph->daddr,
	   (tcph->dest << 16 | tcph->source), 0x543298ff); 
    }
    else{
      return jhash_3words(iph->saddr, iph->daddr, iph->protocol, 0x543298ff);
    }      
  }
  return 0;  
}

static __inline__ short flow_hash(struct sk_buff *skb, struct Qdisc *sch)
{
  int i = 0;
  u32 hash;
  short index;
  struct ip_conntrack *ct;
  enum ip_conntrack_info ctinfo;
  struct flow_sched_data *q = qdisc_priv(sch);

  ct = ip_conntrack_get(skb, &ctinfo);

  //if (ct->proto.tcp.state == TCP_CONNTRACK_ESTABLISHED)
  hash = get_hash(skb);
  index = hash % HASH_SIZE;

  
  if (q->is_used[index] == hash){
    //printk("Correct flow found, hash: %u, index: %i \n", hash, index);
    //Found correct flow
    return index;
  }
  else if (q->is_used[index] == 0){
    psched_time_t next_send;
    PSCHED_GET_TIME(next_send);
    //bucket unused, lets use this one
    printk("New flow, hash: %u, index: %i \n", hash, index);
    q->is_used[index] = hash; 
    //seting the dequeue time!
    //READ_LOCK(&tcp_lock);
    //q->next_send_time[index] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
    //READ_UNLOCK(&tcp_lock);
    q->next_send_time[index] = PSCHED_TADD(next_send, q->interval[index]);
    
    return index;
  }
  else{
    //Must search the whole table to see if this is an
    //established connection, 
    for(i = 0; i < HASH_SIZE; i++){
      if (q->is_used[(index+i)%HASH_SIZE] == hash){	
	//found the right slot in the table;
	return (index+i)%HASH_SIZE;
      }
    }
    
    //Ok didn't find an established connection
    //This *must* be a new connection
    for(i = 0; i < HASH_SIZE; i++){
      if (q->is_used[(index+i)%HASH_SIZE] == 0){
	psched_time_t next_send;
	PSCHED_GET_TIME(next_send);
	//found unused slot in the hash table.
	q->is_used[(index+i)%HASH_SIZE] = hash;
	//READ_LOCK(&tcp_lock);
	//q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
	//READ_UNLOCK(&tcp_lock);
	q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, q->interval[(index+i)%HASH_SIZE]);
	return (index+i)%HASH_SIZE;
      }
    }
    
    //Nothing found, strange!
    //Must be an error
    printk(KERN_WARNING "No hash bucket found!\n");
    return -1; 
  }
}


static __inline__ int flow_is_valid(struct sk_buff *skb){
  
  struct iphdr *iph = skb->nh.iph;
  enum ip_conntrack_info ctinfo;
  struct tcphdr *tcph = (void *)iph + (iph->ihl*4);

  if (skb->protocol != __constant_htons(ETH_P_IP) )
    return 0;

  if (iph->protocol != IPPROTO_TCP)
    return 0;

  ip_conntrack_get(skb, &ctinfo);

  if (!tcph->ack)
    return 0;

  if (ctinfo != IP_CT_ESTABLISHED)
    return 0;
  
  return 1;  
}


static int flow_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  short hash;

  if (flow_is_valid(skb) == 1)
    hash = flow_hash(skb, sch);
  else 
    hash = 0;

  if (!skb->nfmark)
    goto drop;

  if (q->qlen && sch->q.qlen >= q->qlen){
    printk("Dropping packet, queue full\n");
    goto drop;
  }

  if (hash == -1)
    goto drop;

  
  //printk("flow_enqueue hash %u skb=%p @%lu\n", hash, skb, jiffies);
  
  
  //printk("qs len pre: %i \n", q->qs[hash].qlen);	
  __skb_queue_tail(&q->qs[hash], skb);
  //printk("qs len post: %i \n", q->qs[hash].qlen);

  q->remove_timer[hash].expires = jiffies + FLOW_LIFETIME;
  add_timer(&q->remove_timer[hash]);
  
  if (++sch->q.qlen < q->limit-1){
    sch->stats.bytes += skb->len;
    sch->stats.packets++;
    return NET_XMIT_SUCCESS;
  }

drop:
  sch->stats.drops++;
  kfree_skb(skb);
  return NET_XMIT_CN;
}

static unsigned int flow_drop(struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  unsigned int len;
  struct sk_buff *skb;
  int i = 0;
  
  //just delete from the first flow found
  while (q->qs[i].qlen == 0 && i < HASH_SIZE)
    i++;
  
    if (i == HASH_SIZE)
    //nothing found to delete
    return 0;
  else{
    
    skb = q->qs[i].prev;
    
    len = skb->len;
    __skb_unlink(skb, &q->qs[i]);
    kfree_skb(skb);
    sch->q.qlen--;
    sch->stats.drops++;
    return len;
  }
	
}

static int flow_requeue(struct sk_buff *skb, struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  short hash = flow_hash(skb, sch);
  
  __skb_queue_head(&q->qs[hash], skb);
  if (++sch->q.qlen < q->limit-1)
    return NET_XMIT_SUCCESS;
  
  
  flow_drop(sch);
  return NET_XMIT_CN;
}


static __inline__ struct sk_buff *
is_time_to_send(struct Qdisc *sch, psched_time_t *now, struct flow_sched_data *q, int *i, long *min)
{
  long diff;
  
  for (; *i < HASH_SIZE; (*i)++){
     diff = PSCHED_TDIFF(q->next_send_time[*i], *now);
    if (q->qs[*i].qlen >0){
      //printk("FOUND used queue %i \n", *i);
      //printk("DIFF: %li\n", diff);
      if (diff <=  0)	
	//printk("Send NOW %i\n", *i);
	return __skb_dequeue(&q->qs[*i]);
      else{
	if (diff > 0 && (diff < *min || *min ==-1)){
	  //printk("Setting min %li for i=%i \n", diff, *i);
	  *min = diff;
	}

      }
      
    }
  }

  return NULL;		
}

static struct sk_buff *flow_dequeue(struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  struct sk_buff  *skb;
  struct ip_conntrack *ct;
  enum ip_conntrack_info ctinfo; 

  int i = 0;
  long min = -1;
  psched_time_t now;
  PSCHED_GET_TIME(now);
  
  while ( (skb = is_time_to_send(sch, &now, q, &i, &min) ) != NULL){
    //Get the conntrack info
    ct = ip_conntrack_get(skb, &ctinfo);
    //Decrease Qdisc counter
    sch->q.qlen--;
    //Turn flag OFF
    sch->flags &= ~TCQ_F_THROTTLED;
    //printk("flow_dequeue hash=%i skb=%p @%lu\n",i, skb, jiffies);
    //printk("RATE: %u\n", ct->proto.tcp.rate);
    //Update next send time
    //q->next_send_time[i] = PSCHED_TADD(now, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
    q->next_send_time[i] = PSCHED_TADD(now, q->interval[i]);
    q->last_pkt_sent[i] = now;
    //printk(" Next send time is %lu\n", q->next_send_time[i]);
    return skb;
    
  }
  if (skb == NULL){
    long delay = PSCHED_US2JIFFIE(min);
    
    if (min == -1){
      //noting to send
      printk("    Noting to send\n");
      return NULL;
    }
    
    
    if (delay <= 0)
      delay = 1;
    mod_timer(&q->timer, jiffies+delay);
    //printk("  Not time, sch->q.qlen= %i and limit %i\n", sch->q.qlen, q->limit);		
    //printk("  delaying jiffies = %li\n", delay);
    //Turn flag ON
    sch->flags |= TCQ_F_THROTTLED;
  }
  
  return NULL;

}

static void flow_reset(struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  struct sk_buff *skb;
  
  while((skb = flow_dequeue(sch)) != NULL)
    kfree_skb(skb);
  
  sch->q.qlen = 0;
  sch->flags &= ~TCQ_F_THROTTLED;
  del_timer(&q->timer);
}

static void flow_timer(unsigned long arg)
{
  struct Qdisc *sch = (struct Qdisc *)arg;
  
  sch->flags &= ~TCQ_F_THROTTLED;
  netif_schedule(sch->dev);
}


static __inline__ void flow_timer_del(unsigned long p)
{
  struct flow_sched_data *q = (struct flow_sched_data *) p;
  psched_time_t now;
  psched_tdiff_t diff;
  int i = 0;

  printk("TIMER CALLED\n");
  printk("sizeof psched_time_t %i\n", sizeof(psched_time_t));

  //Some flow must be expired since we get called, 
  //must find it
  PSCHED_GET_TIME(now);


  for(i = 0; i < HASH_SIZE; i++)
    if (q->is_used[i] != 0 || i == 0){
      // (now - last_sent) > FLOW_LIFETIME
      //printk("Now %llu, last sent %llu \n", now, q->last_pkt_sent[i]);
      //printk("PSCEHED_TDIFF %li, %d\n", PSCHED_TDIFF(now, q->last_pkt_sent[i]), FLOW_LIFETIME);
      //printk("us2jiffies %lu\n", PSCHED_US2JIFFIE(PSCHED_TDIFF(now, q->last_pkt_sent[i])));
      diff = PSCHED_TDIFF(now, q->last_pkt_sent[i]);
      if ( PSCHED_US2JIFFIE(diff) >= FLOW_LIFETIME){
	printk("Deleteing flow %i \n", i);
	q->is_used[i] = 0;
      }

    }      
}

static int flow_init(struct Qdisc *sch, struct rtattr *opt)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  int i;
  
  psched_time_t now;	
  
  printk("----Initialising sch_flow\n----");
  
  PSCHED_GET_TIME(now);
  
  for (i=0; i < HASH_SIZE; i++){
    skb_queue_head_init(&q->qs[i]);
    q->is_used[i] = 0;
    q->last_pkt_sent[i] = now;
    //constant delay of 200ms
    q->interval[i] = PSCHED_JIFFIE2US(HZ/5);
    init_timer(&q->remove_timer[i]);
    q->remove_timer[i].function = flow_timer_del;
    q->remove_timer[i].data = (unsigned long) q;
  }
  
  init_timer(&q->timer);
  q->timer.function = flow_timer;
  q->timer.data = (unsigned long) sch;
  
  return 0;
}

static void flow_destroy(struct Qdisc *sch)
{
  struct flow_sched_data *q = qdisc_priv(sch);
  int i;

  printk("----Destroy called\n");

  for (i = 0; i < HASH_SIZE; i++)
    del_timer(&q->remove_timer[i]);

  del_timer(&q->timer);
}

static int flow_dump(struct Qdisc *sch, struct sk_buff *skb)
{
  return skb->len;

}

static struct Qdisc_ops flow_qdisc_ops = {
  .id             =       "flow",
  .priv_size      =       sizeof(struct flow_sched_data),
  .enqueue        =       flow_enqueue,
  .dequeue        =       flow_dequeue,
  .requeue        =       flow_requeue,
  .drop           =       flow_drop,
  .init           =       flow_init,
  .reset          =       flow_reset,
  .destroy        =       flow_destroy,
  .change         =       flow_init,
  .dump           =       flow_dump,
  .owner          =       THIS_MODULE,
};


static int __init flow_module_init(void)
{
  return register_qdisc(&flow_qdisc_ops);
}
static void __exit flow_module_exit(void)
{
  unregister_qdisc(&flow_qdisc_ops);
}
module_init(flow_module_init)
module_exit(flow_module_exit)
MODULE_LICENSE("GPL");

_______________________________________________
LARTC mailing list
LARTC@mailman.ds9a.nl
http://mailman.ds9a.nl/cgi-bin/mailman/listinfo/lartc

Reply via email to