On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <[EMAIL PROTECTED]> wrote:
> Mike Makonnen wrote:
>>
>> Patrick Tracanelli wrote:
>>>
>>> To let you know of my current (real world) tests:
>>>
>>> - Wireless Internet Provider 1:
>>>    - 4Mbit/s of Internet Traffic
>>>    - Classifying default protocols + soulseek + ssh
>>>    - Classifying 100Mbit/s of dump over ssh
>>>
>>> Results in:
>>>    No latency added, very low CPU usage, no packets dropping.
>>>
>>> - Wireless ISP 2:
>>>    - 21 Mbit/s of Internet Traffic
>>>    - Classifying default protocols + soulseek + ssh
>>>
>>> Results in:
>>>    No tcp or udp traffic at all; everything that gets diverted never
>>> comes out of the divert socket, and ipfw-classifyd logs
>>>
>>> Aug  1 12:07:35 ourofino last message repeated 58 times
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
>>> (rule 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
>>> 1000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh   (rule
>>> 50000)
>>> Aug  1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
>>> socket: Operation not permitted
>>> Aug  1 12:18:50 ourofino last message repeated 90 times
>>
>> Hmmm... this part means that the call to sendto(2) to write the packet
>> back into network stack failed.  This explains why you are not seein g any
>> traffic comming back out of the divert socket, but I don't see why it would
>> suddenly fail with a permission error. Could this be a kernel bug?
>>>
>>> Aug  1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
>>> Aug  1 12:19:11 ourofino last message repeated 94 times
>>>
>>> Raised queue len a lot (up to 40960), when the application starts it uses
>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
>>
>> This looks like a deadlock. If it weren't able to process packets fast
>> enough the cpu usage should be high even as it's spewing "packet dropped"
>> messages. Can you send me some more information like memory usage and the
>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
>> If you reduce the number of protocols you are trying to match against does
>> the behavior change? Using netstat -w1 -I<interface> can you tell me how
>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
>> the timestamps from the log file seem to show that the daemon is running for
>> approx. 34 sec. before the first "unable to write to write to divert socket"
>> message. Is it passing traffic during this time? Thanks.
>>
>> I've uploaded a newer version. Can you try that also please. It includes:
>>  o SIGHUP forces it to re-read its configuration file
>>  o rc.d script
>>  o minor optimization (calls pthread_cond_signal with the mutex unlocked)
>>  o code cleanup
>>
>> Also, for your convenience I have attached a patch against the earlier
>> version that removes a debugging printf that should remove spammage to your
>> log files (the current version has it removed already).
>>
>
> Ooops, a few minutes after I sent this email I found a couple of bugs (one
> major, and one minor). They were in the original tarball as well as the
> newer one I uploaded earlier today. I've uploaded a fixed version of the
> code. Can you try that instead please.
>
> Also, to help track down performance issues I've modified the Makefile to
> build a profiled version of the application so you can use gprof(1) to
> figure out where any problems lie.
>

Does this sound about right for implementing the GC and implementing syntax as
$protocol = dnpipe 20
$protocol2 = dnqueue 30
it has some extra goos for pf(4) and altq(4)
$protocol3 = queue $queue name
$protocol4 = tag TAGNAME
$protocol5 = action block

It adds 2 new options -e seconds for seconds before a flow is
considered expired and -n #packets proccessed before kicking the GC.

--- classifyd_old.c     2008-08-09 00:33:04.000000000 +0000
+++ classifyd.c 2008-08-09 00:33:34.000000000 +0000
@@ -28,13 +28,17 @@

 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>

+#include <net/if.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <netinet/in_systm.h>
 #include <netinet/ip.h>
 #include <netinet/tcp.h>
 #include <netinet/udp.h>
+#include <net/pfvar.h>

 #include <assert.h>
 #include <err.h>
@@ -53,6 +57,7 @@
 #include <unistd.h>

 #include "hashtable.h"
+#include "hashtable_private.h"
 #include "pathnames.h"
 #include "protocols.h"

@@ -94,6 +99,7 @@
        uint32_t if_datalen;    /* length in bytes of if_data */
        uint16_t if_pktcount;   /* number of packets concatenated */
        uint16_t if_fwrule;     /* ipfw(4) rule associated with flow */
+       time_t   expire;        /* flow expire time */
 };

 /*
@@ -126,7 +132,7 @@
 static struct ic_queue outQ;

 /* divert(4) socket */
-static int dvtS;
+static int dvtS = 0;

 /* config file path */
 static const char *conf = IC_CONFIG_PATH;
@@ -137,12 +143,25 @@
 /* List of protocols available to the system */
 struct ic_protocols *fp;

+/* Our hashtables */
+struct hashtable *sh = NULL,
+               *th = NULL,
+               *uh = NULL;
+
+/* signaled to kick garbage collector */
+static pthread_cond_t  gq_condvar;
+
+/* number of packets before kicking garbage collector */
+static unsigned int npackets = 250;
+
+static time_t time_expire = 40; /* 40 seconds */
 /*
  * Forward function declarations.
  */
 void           *classify_pthread(void *);
 void           *read_pthread(void *);
 void           *write_pthread(void *);
+void           *garbage_pthread(void *);
 static int     equalkeys(void *, void *);
 static unsigned int hashfromkey(void *);
 static void    test_re(void);
@@ -155,7 +174,7 @@
 {
        struct sockaddr_in addr;
        struct sigaction sa;
-       pthread_t  classifytd, readtd, writetd;
+       pthread_t  classifytd, readtd, writetd, garbagectd;
        const char *errstr;
        long long  num;
        uint16_t   port, qmaxsz;
@@ -164,13 +183,27 @@
        tflag = 0;
        port = IC_DPORT;
        qmaxsz = IC_QMAXSZ;
-       while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
+       while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
                switch(ch) {
                case 'c':
                        conf = strdup(optarg);
                        if (conf == NULL)
                                err(EX_TEMPFAIL, "config file path");
                        break;
+               case 'e':
+                       num = strtonum((const char *)optarg, 1, 400, &errstr);
+                       if (num == 0 && errstr != NULL) {
+                               errx(EX_USAGE, "invalud expire seconds: %s", 
errstr);   
+                       }
+                       time_expire = (time_t)num;
+                       break;
+               case 'n':
+                        num = strtonum((const char *)optarg, 1,
65535, &errstr);
+                        if (num == 0 && errstr != NULL) {
+                                errx(EX_USAGE, "invalud expire
seconds: %s", errstr);
+                        }
+                        npackets = (unsigned int)num;
+                       break;
                case 'P':
                        protoDir = strdup(optarg);
                        if (protoDir == NULL)
@@ -230,6 +263,9 @@
        error = pthread_cond_init(&outQ.fq_condvar, NULL);
        if (error != 0)
                err(EX_OSERR, "unable to initialize output queue condvar");
+        error = pthread_cond_init(&gq_condvar, NULL);
+        if (error != 0)
+                err(EX_OSERR, "unable to initialize garbage collector
condvar");

        /*
         * Create and bind the divert(4) socket.
@@ -276,32 +312,80 @@
        if (error == -1)
                err(EX_OSERR, "unable to set signal handler");

+        /*
+         * There are 3 tables: udp, tcp, and tcp syn.
+         * The tcp syn table tracks connections for which a
+         * SYN packet has been sent but no reply has been returned
+         * yet. Once the SYN ACK reply is detected it is moved to
+         * the regular tcp connection tracking table.
+         */
+        sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (sh == NULL) {
+                syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
+               error = EX_SOFTWARE;
+               goto cleanup;
+        }
+        th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (th == NULL) {
+                syslog(LOG_ERR, "unable to create TCP tracking table");
+               error = EX_SOFTWARE;
+                goto cleanup;
+        }
+        uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (uh == NULL) {
+                syslog(LOG_ERR, "unable to create UDP tracking table");
+               error = EX_SOFTWARE;
+                goto cleanup;
+        }
+
        /*
         * Create the various threads.
         */
        error = pthread_create(&readtd, NULL, read_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create reader thread");
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create reader thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create classifier thread");
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create classifier thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        error = pthread_create(&writetd, NULL, write_pthread, NULL);
-       if (error != 0)
-               err(EX_OSERR, "unable to create writer thread");
-
+       if (error != 0) {
+               syslog(LOG_ERR, "unable to create writer thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
+        error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
+        if (error != 0) {
+                syslog(LOG_ERR, "unable to create garbage collect thread");
+               error = EX_OSERR;
+               goto cleanup;
+       }
        /*
         * Wait for our threads to exit.
         */
        pthread_join(readtd, NULL);
        pthread_join(classifytd, NULL);
        pthread_join(writetd, NULL);
-
+       pthread_join(garbagectd, NULL);
        /*
         * Cleanup
         */
-       close(dvtS);
+cleanup:
+       if (dvtS > 0)
+               close(dvtS);
+       if (sh != NULL)
+               hashtable_destroy(sh, 1);
+       if (th != NULL)
+               hashtable_destroy(th, 1);
+       if (uh != NULL)
+               hashtable_destroy(uh, 1);
        
-       return (0);
+       return (error);
 }

 void *
@@ -310,6 +394,7 @@
        struct ic_pkt      *pkt;
        struct ip *ipp;
        int       len;
+       unsigned int pcktcnt = 0;

        while (1) {
                pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
@@ -353,6 +438,10 @@
                STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
                inQ.fq_size++;
                pthread_mutex_unlock(&inQ.fq_mtx);
+               if (++pcktcnt > npackets) {
+                       pcktcnt = 0;
+                       pthread_cond_signal(&gq_condvar);
+               }
                pthread_cond_signal(&inQ.fq_condvar);
        }

@@ -420,39 +509,19 @@
        struct tcphdr    *tcp;
        struct udphdr    *udp;
        struct ic_pkt    *pkt;
-       struct hashtable *sh, *th, *uh;
        struct protocol  *proto;
+       struct timeval   tv;
        regmatch_t       pmatch;
        u_char           *data, *payload;
        uint16_t         trycount;
        int              datalen, error;

-       /*
-        * There are 3 tables: udp, tcp, and tcp syn.
-        * The tcp syn table tracks connections for which a
-        * SYN packet has been sent but no reply has been returned
-        * yet. Once the SYN ACK reply is detected it is moved to
-        * the regular tcp connection tracking table.
-        */
-       sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (sh == NULL) {
-               syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
-               exit(EX_SOFTWARE);
-       }
-       th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (th == NULL) {
-               syslog(LOG_ERR, "unable to create TCP tracking table");
-               exit(EX_SOFTWARE);
-       }
-       uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-       if (uh == NULL) {
-               syslog(LOG_ERR, "unable to create UDP tracking table");
-               exit(EX_SOFTWARE);
-       }
-
        flow = NULL;
        key = NULL;
        while(1) {
+               while(gettimeofday(&tv, NULL) != 0)
+                       ;
+
                pthread_mutex_lock(&inQ.fq_mtx);
                pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
                while (pkt == NULL) {
@@ -528,6 +597,8 @@
                                        free(pkt);
                                        continue;
                                }
+                               
+                               flow->expire = tv.tv_sec;
                                goto enqueue;
                        /*
                         * Handle session tear-down.
@@ -583,8 +654,11 @@
                                 * collecting IC_PKTMAXMATCH packets, just pass 
it through.
                                 */
                                } else if (flow->if_pktcount >= IC_PKTMAXMATCH 
&&
-                                   flow->if_fwrule == 0)
+                                   flow->if_fwrule == 0) {
+                                       flow->expire = tv.tv_sec;
                                        goto enqueue;
+                               }
+                               flow->expire = tv.tv_sec;
                                goto classify;
                        }

@@ -630,6 +704,7 @@
                                        free(pkt);
                                        continue;
                                }
+                               flow->expire = tv.tv_sec;
                                goto classify;
                        }

@@ -688,6 +763,7 @@
                                flow->if_datalen = datalen;
                                flow->if_pktcount = 1;
                                flow->if_fwrule = 0;
+                               flow->expire = tv.tv_sec;
                                if (hashtable_insert(uh, (void *)key, (void 
*)flow) == 0) {
                                        syslog(LOG_WARNING,
                                            "packet dropped: unable to insert 
into table");
@@ -715,19 +791,26 @@
                                flow->if_data = data;
                                flow->if_datalen += datalen;
                                flow->if_pktcount++;
+                               flow->expire = tv.tv_sec;
                        /*
                         * If we haven't been able to classify this flow after
                         * collecting IC_PKTMAXMATCH packets, just pass it 
through.
                         */
                        } else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
-                           flow->if_fwrule == 0)
+                           flow->if_fwrule == 0) {
+                               flow->expire = tv.tv_sec;
                                goto enqueue;
+                       }
                } else
                        /* Not an TCP or UDP packet. */
                        goto enqueue;

 classify:
-               assert(flow != NULL);
+               if (flow == NULL) {
+                       syslog(LOG_ERR, "flow is null argghhhhhhh");
+                       goto enqueue;
+               }
+               //assert(flow != NULL);

                /*
                 * Inform divert(4) what rule to send it to by
@@ -823,6 +906,80 @@
        return (NULL);
 }

+void *
+garbage_pthread(void *arg __unused)
+{
+       char errbuf[LINE_MAX];
+       struct entry *e, *f;
+       unsigned int i, flows_expired, error;
+       struct timeval tv;
+
+       while (1) {
+               flows_expired = 0;
+               while (gettimeofday(&tv, NULL) != 0)
+                       ;
+               tv.tv_sec -= time_expire;
+
+               pthread_mutex_lock(&inQ.fq_mtx);
+                error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
+                if (error != 0) {
+                        strerror_r(error, errbuf, sizeof(errbuf));
+                        syslog(EX_OSERR, "unable to wait on garbage
collection: %s",
+                                errbuf);
+                        exit(EX_OSERR);
+                }
+
+               for (i = 0; i < sh->tablelength; i++) {
+                       e = sh->table[i];
+                       while (e != NULL) {
+                               f = e; e = e->next;
+                               if (((struct ip_flow *)f->v)->expire < 
tv.tv_sec) {
+                                       freekey(f->k);
+                                       sh->entrycount--;
+                                       if (f->v != NULL)
+                                               free(f->v);
+                                       free(f);
+                                       flows_expired++;
+                               }
+                       }
+               }
+                for (i = 0; i < th->tablelength; i++) {
+                        e = th->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        th->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+                                       flows_expired++;
+                                }
+                        }
+                }
+                for (i = 0; i < uh->tablelength; i++) {
+                        e = uh->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        uh->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+                                       flows_expired++;
+                                }
+                        }
+                }
+
+               pthread_mutex_unlock(&inQ.fq_mtx);
+               
+               syslog(LOG_WARNING, "expired %u flows", flows_expired);
+       }
+
+       return (NULL);
+}
+
 /*
  * NOTE: The protocol list (plist) passed as an argument is a global
  *      variable. It is accessed from 3 functions: classify_pthread,
@@ -840,12 +997,20 @@
 static int
 read_config(const char *file, struct ic_protocols *plist)
 {
+       enum { bufsize = 2048 };
        struct protocol *proto;
        properties      props;
-       const char      *errmsg, *name, *value;
-       int             fd;
+       const char      *errmsg, *name;
+       char            *value;
+       int             fd, fdpf;
        uint16_t        rule;
+       char **ap, *argv[bufsize];

+       fdpf = open("/dev/pf", O_RDONLY);
+       if (fdpf == -1) {
+               syslog(LOG_ERR, "unable to open /dev/pf");
+               return (EX_OSERR);
+       }
        fd = open(file, O_RDONLY);
        if (fd == -1) {
                syslog(LOG_ERR, "unable to open configuration file");
@@ -863,10 +1028,48 @@
                /* Do not match traffic against this pattern */
                if (value == NULL)
                        continue;
-               rule = strtonum(value, 1, 65535, &errmsg);
-               if (rule == 0) {
+               for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
+                       if (**ap != '\0')
+                               if (++ap >= &argv[bufsize])
+                                       break;
+               if (!strncmp(argv[0], "queue", strlen("queue"))) {
+                       if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
+                               syslog(LOG_WARNING,
+                                       "could not get ALTQ translation for"
+                                       " queue %s", argv[1]);
+                               continue;
+                       }
+                       if (rule == 0) {
+                               syslog(LOG_WARNING,
+                                       "queue %s does not exists!", argv[1]);
+                               continue;
+                       }
+               } else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
+                       rule = strtonum(argv[1], 1, 65535, &errmsg);
+               else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
+                       rule = strtonum(argv[1], 1, 65535, &errmsg);
+               else if (!strncmp(argv[0], "tag", strlen("tag"))) {
+                        if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
+                                syslog(LOG_WARNING,
+                                        "could not get tag translation for"
+                                        " queue %s", argv[1]);
+                                continue;
+                        }
+                        if (rule == 0) {
+                                syslog(LOG_WARNING,
+                                        "tag %s does not exists!", argv[1]);
+                                continue;
+                        }
+               } else if (!strncmp(argv[0], "action", strlen("action"))) {
+                       if (strncmp(argv[1], "block", strlen("block")))
+                               rule = PF_DROP;
+                       else if (strncmp(argv[1], "allow", strlen("allow")))
+                               rule = PF_PASS;
+                       else
+                               continue;
+               } else {
                        syslog(LOG_WARNING,
-                           "invalid rule number for %s protocol: %s",
+                           "invalid action specified for %s protocol: %s",
                            proto->p_name, errmsg);
                        continue;
                }
@@ -953,10 +1156,14 @@
 static void
 usage(const char *arg0)
 {
-       printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
basename(arg0));
+       printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
+               "[-p port] [-P dir] [-q length]\n", basename(arg0));
        printf("usage: %s -t -P dir\n", basename(arg0));
        printf( "    -c file   : path to configuration file\n"
+               "    -e secs   : number of seconds before a flow is expired\n"
                "    -h        : this help screen\n"
+               "    -n packets: number of packets before the garbage collector"
+                       " tries to expire flows\n"
                "    -P dir    : directory containing protocol patterns\n"
                "    -p port   : port number of divert socket\n"
                "    -q length : max length (in packets) of in/out queues\n"

-- 
Ermal
_______________________________________________
freebsd-net@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/freebsd-net
To unsubscribe, send any mail to "[EMAIL PROTECTED]"

Reply via email to