Hi Craig, I meant to reply to this earlier! I've just updated my variant of pfdnacluster_master to reflect the latest PF_RING SVN (attached).
It compiles, but I can't test it easily at the moment (I have to go through a change management process before I change anything in PF_RING on our live servers, which are the only ones with Intel cards + DNA, ever since it managed to seriously upset the border switch by *sending* vast quantities of packets to its mirror port. Not sure how that happened - DNA was supposed to be in receive only mode!) You may find that ARGUS will use up all avalaible cycles on its CPU cores (unless the issue with DNA and select() has been fixed). I'm not running it with multiple interfaces at the moment. I'm running with pfdnacluster_master_cdw -i dna0 -c 1 -n 8 -D 2 -A 1 -l 1522 -d to have 8 queues, duplicated, plus an additional queue that gets everything. Suricata is using dnacl1:0-7, Bro IDS is using dnacl1:8-15 and ARGUS is using dnacl1:16 Also make sure you're running an up-to-date ARGUS if you have IPv6 traffic - there was a bug that caused it to modify IPv6 packets in memory, which is disastrous in zero-copy! Best Wishes, Chris On 30/11/12 19:07, Craig Merchant wrote: > We're in the process of deploying the redBorder Snort management > solution (www.redborder.net<http://www.redborder.net>). > > The boxes we're using for sensors each have a dual-port fiber adapter > from Silicom with the DNA/libzero license. This is the first time > I've tried to configure DNA and libzero before, so I'd love a little > guidance from the community. Here's what I'm looking to do: > > I want to share traffic between Snort and the Argus flow collector > tool. I want to hash and distribute traffic to Snort such that each > of the 16 instances only sees a subset of the traffic. I want a > single instance of Argus to view all of the traffic. Argus can read > data from multiple interfaces or channels simultaneously. > > Do I use pfdnacluster_master or pfdnacluster_multithread? I'm not > clear on how I can hash the traffic and fan it out to X number of > consumers and then make a zero-copy of that fanned out traffic? > > Thanks. > > Craig > -- --+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+- Christopher Wakelin, [email protected] IT Services Centre, The University of Reading, Tel: +44 (0)118 378 2908 Whiteknights, Reading, RG6 6AF, UK Fax: +44 (0)118 975 3094
/* * * (C) 2012 - Luca Deri <[email protected]> * Alfredo Cardigliano <[email protected]> * * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <signal.h> #include <sched.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <string.h> #include <unistd.h> #include <sys/mman.h> #include <errno.h> #include <sys/poll.h> #include <netinet/in_systm.h> #include <netinet/in.h> #include <netinet/ip.h> #include <net/ethernet.h> /* the L2 protocols */ #include <sys/time.h> #include <time.h> #include <sys/socket.h> #include <arpa/inet.h> #include <inttypes.h> #include "pfring.h" #define ALARM_SLEEP 1 #define MAX_NUM_APP DNA_CLUSTER_MAX_NUM_SLAVES #define MAX_NUM_DEV DNA_CLUSTER_MAX_NUM_SOCKETS #define DEFAULT_DEVICE "dna0" int num_app = 1, num_dups = 1, num_allcap = 0, num_dev = 0; int snaplen = 1500; pfring *pd[MAX_NUM_DEV]; pfring_dna_cluster *dna_cluster_handle; u_int8_t wait_for_packet = 1, print_interface_stats = 0, do_shutdown = 0, hashing_mode = 0, use_hugepages = 0; socket_mode mode = recv_only_mode; u_int32_t dup_mask, all_mask, i; static struct timeval startTime; void bind2core(u_int core_id) { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(core_id, &cpuset); if(pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) fprintf(stderr, "Error while binding to core %u\n", core_id); } /* *************************************** */ double delta_time (struct timeval * now, struct timeval * before) { time_t delta_seconds; time_t delta_microseconds; delta_seconds = now -> tv_sec - before -> tv_sec; delta_microseconds = now -> tv_usec - before -> tv_usec; if(delta_microseconds < 0) { delta_microseconds += 1000000; /* 1e6 */ -- delta_seconds; } return((double)(delta_seconds * 1000) + (double)delta_microseconds/1000); } /* ******************************** */ void daemonize() { pid_t pid, sid; pid = fork(); if (pid < 0) exit(EXIT_FAILURE); if (pid > 0) exit(EXIT_SUCCESS); sid = setsid(); if (sid < 0) exit(EXIT_FAILURE); if ((chdir("/")) < 0) exit(EXIT_FAILURE); close(STDIN_FILENO); close(STDOUT_FILENO); close(STDERR_FILENO); } /* ******************************** */ void print_stats() { struct timeval endTime; double deltaMillisec; static u_int8_t print_all; static struct timeval lastTime; char buf0[64], buf1[64], buf2[64]; u_int64_t RXdiff, TXdiff, RXProcdiff; static u_int64_t lastRXPkts = 0, lastTXPkts = 0, lastRXProcPkts = 0; unsigned long long nRXPkts = 0, nTXPkts = 0, nRXProcPkts = 0; pfring_dna_cluster_stat cluster_stats; if(startTime.tv_sec == 0) { gettimeofday(&startTime, NULL); print_all = 0; } else print_all = 1; gettimeofday(&endTime, NULL); deltaMillisec = delta_time(&endTime, &startTime); if(dna_cluster_stats(dna_cluster_handle, &cluster_stats) == 0) { nRXPkts = cluster_stats.tot_rx_packets; nTXPkts = cluster_stats.tot_tx_packets; nRXProcPkts = cluster_stats.tot_rx_processed; fprintf(stderr, "---\nAbsolute Stats:"); if (mode != send_only_mode) { fprintf(stderr, " RX %s pkts", pfring_format_numbers((double)nRXPkts, buf1, sizeof(buf1), 0)); if(print_all) fprintf(stderr, " [%s pkt/sec]", pfring_format_numbers((double)(nRXPkts*1000)/deltaMillisec, buf1, sizeof(buf1), 1)); fprintf(stderr, " Processed %s pkts", pfring_format_numbers((double)nRXProcPkts, buf1, sizeof(buf1), 0)); if(print_all) fprintf(stderr, " [%s pkt/sec]", pfring_format_numbers((double)(nRXProcPkts*1000)/deltaMillisec, buf1, sizeof(buf1), 1)); } if (mode != recv_only_mode) { fprintf(stderr, " TX %s pkts", pfring_format_numbers((double)nTXPkts, buf1, sizeof(buf1), 0)); if(print_all) fprintf(stderr, " [%s pkt/sec]", pfring_format_numbers((double)(nTXPkts*1000)/deltaMillisec, buf1, sizeof(buf1), 1)); } fprintf(stderr, "\n"); if (mode != send_only_mode && print_interface_stats) { int i; pfring_stat if_stats; for (i = 0; i < num_dev; i++) { if (pfring_stats(pd[i], &if_stats) >= 0) fprintf(stderr, " %s RX %" PRIu64 " pkts Dropped %" PRIu64 " pkts (%.1f %%)\n", pd[i]->device_name, if_stats.recv, if_stats.drop, if_stats.recv == 0 ? 0 : ((double)(if_stats.drop*100)/(double)(if_stats.recv + if_stats.drop))); } } if(print_all && (lastTime.tv_sec > 0)) { deltaMillisec = delta_time(&endTime, &lastTime); RXdiff = nRXPkts - lastRXPkts; TXdiff = nTXPkts - lastTXPkts; RXProcdiff = nRXProcPkts - lastRXProcPkts; fprintf(stderr, "Actual Stats: "); if (mode != send_only_mode) { fprintf(stderr, " RX %s pkts [%s ms][%s pps]", pfring_format_numbers((double)RXdiff, buf0, sizeof(buf0), 0), pfring_format_numbers(deltaMillisec, buf1, sizeof(buf1), 1), pfring_format_numbers(((double)RXdiff/(double)(deltaMillisec/1000)), buf2, sizeof(buf2), 1)); fprintf(stderr, " Processed %s pkts [%s ms][%s pps]", pfring_format_numbers((double)RXProcdiff, buf0, sizeof(buf0), 0), pfring_format_numbers(deltaMillisec, buf1, sizeof(buf1), 1), pfring_format_numbers(((double)RXProcdiff/(double)(deltaMillisec/1000)), buf2, sizeof(buf2), 1)); } if (mode != recv_only_mode) { fprintf(stderr, " TX %llu pkts [%s ms][%s pps]", (long long unsigned int)TXdiff, pfring_format_numbers(deltaMillisec, buf1, sizeof(buf1), 1), pfring_format_numbers(((double)TXdiff/(double)(deltaMillisec/1000)), buf2, sizeof(buf2), 1)); } fprintf(stderr, "\n"); } lastRXPkts = nRXPkts; lastTXPkts = nTXPkts; lastRXProcPkts = nRXProcPkts; } lastTime.tv_sec = endTime.tv_sec, lastTime.tv_usec = endTime.tv_usec; } /* ******************************** */ void my_sigalarm(int sig) { if(do_shutdown) return; print_stats(); alarm(ALARM_SLEEP); signal(SIGALRM, my_sigalarm); } /* ******************************** */ void sigproc(int sig) { static int called = 0; fprintf(stderr, "Leaving...\n"); if(called) return; else called = 1; dna_cluster_disable(dna_cluster_handle); do_shutdown = 1; } /* *************************************** */ void printHelp(void) { printf("pfdnacluster_master - (C) 2012 ntop.org\n\n"); printf("pfdnacluster_master [-a] -i dev\n"); printf("-h Print this help\n"); printf("-i <device> Device name (comma-separated list)\n"); printf("-c <cluster> Cluster ID\n"); printf("-n <num app> Number of applications\n"); printf("-D <num dups> Number of duplicates\n"); printf("-A <num all> Number of all-packet copies\n"); printf("-l <len> Capture length\n"); printf("-R Use Symmetric RSS\n"); printf("-r <core_id> Bind the RX thread to a core\n"); printf("-t <core_id> Bind the TX thread to a core\n"); printf("-m <hash mode> Hashing modes:\n" " 0 - IP hash (default)\n" " 1 - MAC Address hash\n" " 2 - IP protocol hash\n" " 3 - Fan-Out\n"); printf("-s Enable TX\n"); printf("-a Active packet wait\n"); printf("-u <mountpoint> Use hugepages for packet memory allocation\n"); printf("-p Print per-interface absolute stats\n"); printf("-d Daemon mode\n"); exit(0); } /* *************************************** */ struct compact_eth_hdr { unsigned char h_dest[ETH_ALEN]; unsigned char h_source[ETH_ALEN]; u_int16_t h_proto; }; struct compact_ip_hdr { u_int8_t ihl:4, version:4; u_int8_t tos; u_int16_t tot_len; u_int16_t id; u_int16_t frag_off; u_int8_t ttl; u_int8_t protocol; u_int16_t check; u_int32_t saddr; u_int32_t daddr; }; struct compact_ipv6_hdr { __u8 priority:4, version:4; __u8 flow_lbl[3]; __be16 payload_len; __u8 nexthdr; __u8 hop_limit; struct in6_addr saddr; struct in6_addr daddr; }; inline u_int32_t master_custom_hash_function(const u_char *buffer, const u_int16_t buffer_len) { u_int32_t l3_offset = sizeof(struct compact_eth_hdr); u_int16_t eth_type; if(hashing_mode == 1 /* MAC hash */) return(buffer[3] + buffer[4] + buffer[5] + buffer[9] + buffer[10] + buffer[11]); eth_type = (buffer[12] << 8) + buffer[13]; while (eth_type == 0x8100 /* VLAN */) { l3_offset += 4; eth_type = (buffer[l3_offset - 2] << 8) + buffer[l3_offset - 1]; } switch (eth_type) { case 0x0800: { /* IPv4 */ struct compact_ip_hdr *iph; if (unlikely(buffer_len < l3_offset + sizeof(struct compact_ip_hdr))) return 0; iph = (struct compact_ip_hdr *) &buffer[l3_offset]; if(hashing_mode == 0 /* IP hash */) return ntohl(iph->saddr) + ntohl(iph->daddr); /* this can be optimized by avoiding calls to ntohl(), but it can lead to balancing issues */ else /* IP protocol hash */ return iph->protocol; } break; case 0x86DD: { /* IPv6 */ struct compact_ipv6_hdr *ipv6h; u_int32_t *s, *d; if (unlikely(buffer_len < l3_offset + sizeof(struct compact_ipv6_hdr))) return 0; ipv6h = (struct compact_ipv6_hdr *) &buffer[l3_offset]; if(hashing_mode == 0 /* IP hash */) { s = (u_int32_t *) &ipv6h->saddr, d = (u_int32_t *) &ipv6h->daddr; return(s[0] + s[1] + s[2] + s[3] + d[0] + d[1] + d[2] + d[3]); } else return(ipv6h->nexthdr); } break; default: return 0; /* Unknown protocol */ } } /* ******************************* */ static int master_distribution_function(const u_char *buffer, const u_int16_t buffer_len, const pfring_dna_cluster_slaves_info *slaves_info, u_int32_t *id_mask, u_int32_t *hash) { u_int32_t slave_idx; /* computing a bidirectional software hash */ *hash = master_custom_hash_function(buffer, buffer_len); /* balancing on hash */ slave_idx = (*hash) % num_app; *id_mask = (dup_mask << slave_idx) | all_mask; return DNA_CLUSTER_PASS; } /* ******************************** */ static int fanout_distribution_function(const u_char *buffer, const u_int16_t buffer_len, const pfring_dna_cluster_slaves_info *slaves_info, u_int32_t *id_mask, u_int32_t *hash) { u_int32_t n_zero_bits = 32 - slaves_info->num_slaves; /* returning slave id bitmap */ *id_mask = ((0xFFFFFFFF << n_zero_bits) >> n_zero_bits); return DNA_CLUSTER_PASS; } /* *************************************** */ int main(int argc, char* argv[]) { char c; char buf[32]; u_int32_t version; int rx_bind_core = 0, tx_bind_core = 1; int cluster_id = -1; char *device = NULL, *dev, *dev_pos = NULL, *hugepages_mountpoint = NULL; int daemon_mode = 0; int rss_rehash = 0, flags = 0; startTime.tv_sec = 0; while((c = getopt(argc,argv,"ac:D:A:r:st:hi:l:Rn:m:du:p")) != -1) { switch(c) { case 'a': wait_for_packet = 0; break; case 'r': rx_bind_core = atoi(optarg); break; case 't': tx_bind_core = atoi(optarg); break; case 'h': printHelp(); break; case 's': mode = send_and_recv_mode; break; case 'i': device = strdup(optarg); break; case 'c': cluster_id = atoi(optarg); break; case 'n': num_app = atoi(optarg); break; case 'D': num_dups = atoi(optarg); break; case 'A': num_allcap = atoi(optarg); break; case 'l': snaplen = atoi(optarg); break; case 'R': rss_rehash = 1; break; case 'm': hashing_mode = atoi(optarg); break; case 'd': daemon_mode = 1; break; case 'p': print_interface_stats = 1; break; case 'u': use_hugepages = 1; hugepages_mountpoint = strdup(optarg); break; } } if (cluster_id < 0 || num_app < 1 || num_dups < 1 || num_allcap < 0 || hashing_mode < 0 || hashing_mode > 3) printHelp(); if (hashing_mode == 3 && (num_dups != 1 || num_allcap !=0 )) { printf("WARNING: no duplication with Fan-out hashing mode\n"); num_dups = 1; } if (num_allcap != 0 && num_app == 1) { printf("WARNING: duplicates of single app, should use Fan-out\n"); } if (((num_app * num_dups) + num_allcap) > MAX_NUM_APP) { printf("WARNING: You cannot instantiate more than %u slave applications\n", MAX_NUM_APP); num_app = (MAX_NUM_APP - num_allcap) / num_dups; } dup_mask = 0; all_mask = 0; for (i = 0; i < num_dups; i++) { dup_mask |= (1 << (i * num_app)); } for (i = 0; i < num_allcap; i++) { all_mask |= (1 << (i + (num_app * num_dups))); } if (device == NULL) device = strdup(DEFAULT_DEVICE); if (daemon_mode) daemonize(); printf("Capturing from %s\n", device); /* Create the DNA cluster */ if ((dna_cluster_handle = dna_cluster_create(cluster_id, (num_app * num_dups) + num_allcap, 0 /* | DNA_CLUSTER_DIRECT_FORWARDING */ /* | DNA_CLUSTER_NO_ADDITIONAL_BUFFERS */ /* | DNA_CLUSTER_DCA */ | (use_hugepages ? DNA_CLUSTER_HUGEPAGES : 0) )) == NULL) { fprintf(stderr, "Error creating DNA Cluster\n"); return(-1); } /* Changing the default settings (experts only) dna_cluster_low_level_settings(dna_cluster_handle, 8192, // slave rx queue slots 8192, // slave tx queue slots 4096 // slave additional buffers (available with alloc/release) ); */ if (use_hugepages) { if (dna_cluster_set_hugepages_mountpoint(dna_cluster_handle, hugepages_mountpoint) < 0) { fprintf(stderr, "Error setting the hugepages mountpoint: did you mount it?\n"); return(-1); } } /* Setting the cluster mode */ dna_cluster_set_mode(dna_cluster_handle, mode); dev = strtok_r(device, ",", &dev_pos); flags = PF_RING_PROMISC; if (rss_rehash) flags |= PF_RING_DNA_SYMMETRIC_RSS; while(dev != NULL) { pd[num_dev] = pfring_open(dev, snaplen, flags); if(pd[num_dev] == NULL) { printf("pfring_open %s error [%s]\n", dev, strerror(errno)); return(-1); } if (num_dev == 0) { pfring_version(pd[num_dev], &version); printf("Using PF_RING v.%d.%d.%d\n", (version & 0xFFFF0000) >> 16, (version & 0x0000FF00) >> 8, version & 0x000000FF); } snprintf(buf, sizeof(buf), "pfdnacluster_master-cluster-%d-socket-%d", cluster_id, num_dev); pfring_set_application_name(pd[num_dev], buf); /* Add the ring we created to the cluster */ if (dna_cluster_register_ring(dna_cluster_handle, pd[num_dev]) < 0) { fprintf(stderr, "Error registering rx socket\n"); dna_cluster_destroy(dna_cluster_handle); return -1; } num_dev++; dev = strtok_r(NULL, ",", &dev_pos); if (num_dev == MAX_NUM_DEV && dev != NULL) { printf("Too many devices\n"); break; } } if (num_dev == 0) { dna_cluster_destroy(dna_cluster_handle); printHelp(); } /* Setting up important details... */ dna_cluster_set_wait_mode(dna_cluster_handle, !wait_for_packet /* active_wait */); dna_cluster_set_cpu_affinity(dna_cluster_handle, rx_bind_core, tx_bind_core); /* The default distribution function allows to balance per IP in a coherent mode (not like RSS that does not do that) */ /* hashing_mode = 0 and no duplication is the same as the default function which is slightly faster */ if ((hashing_mode > 0) || (num_dups > 1)) { if (hashing_mode <= 2) dna_cluster_set_distribution_function(dna_cluster_handle, master_distribution_function); else /* hashing_mode == 3 */ dna_cluster_set_distribution_function(dna_cluster_handle, fanout_distribution_function); } switch(hashing_mode) { case 0: printf("Hashing packets per-IP Address\n"); break; case 1: printf("Hashing packets per-MAC Address\n"); break; case 2: printf("Hashing packets per-IP protocol (TCP, UDP, ICMP...)\n"); break; case 3: printf("Replicating each packet on all applications (no copy)\n"); break; } /* Now enable the cluster */ if (dna_cluster_enable(dna_cluster_handle) < 0) { fprintf(stderr, "Error enabling the engine; dna NICs already in use?\n"); dna_cluster_destroy(dna_cluster_handle); return -1; } printf("The DNA cluster [id: %u][num slave apps: %u] is now running...\n", cluster_id, (num_app * num_dups) + num_allcap); printf("Debug: Dup mask is %d; All mask is %d\n", dup_mask, all_mask); printf("You can now attach to DNA cluster up to %d slaves as follows:\n", (num_app * num_dups) + num_allcap); printf("\tpfcount -i dnacluster:%d\n", cluster_id); signal(SIGINT, sigproc); signal(SIGTERM, sigproc); signal(SIGINT, sigproc); if (!daemon_mode) { signal(SIGALRM, my_sigalarm); alarm(ALARM_SLEEP); } while (!do_shutdown) sleep(1); /* do something in the main */ dna_cluster_destroy(dna_cluster_handle); sleep(2); return(0); }
_______________________________________________ Ntop-misc mailing list [email protected] http://listgateway.unipi.it/mailman/listinfo/ntop-misc
