Hello
Thanks so much for your script, I think I'm going to use it and if I
improve it I will not have any problem in contributing to the public
(I will send you/to tha mailing list the paches/modified version)
Thanks!
Ulisses
On Thu, Feb 09, 2006 at 04:16:35PM -0000, McGlinchy, Alistair wrote:
> Ulisses,
>
> > I'm interested in capturing information from a netflow v5
> > flow, but I would like aggregate information based on nework
> > prefixes or AS.
> >
> > Is this possible with flow-tools? If not, what tool do you suggest?
> >
> > I've tried with flow-export | flow-import but I could not achieve it
> > that way.
>
> Here's my version of flow-aggregate which is just a big munge of
> flow-export | magic | flow-import. It doesn't do AS aggregation and the
> subnet mask manipulation could be greatly improved. I post it to inspire
> you as to how to solve your particular problem rather than as a
> ready-made solution.
>
> Cheers
>
> Alistair
>
>
> __CODE__
> #!/usr/bin/perl
> # Copyright (C) 2005 Alistair McGlinchy
> # This program is free software; you can redistribute it and/or
> # modify it under the terms of the GNU General Public License
> # as published by the Free Software Foundation; either version 2
> # of the License, or (at your option) any later version.
> #
> # This program is distributed in the hope that it will be useful,
> # but WITHOUT ANY WARRANTY; without even the implied warranty of
> # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> # GNU General Public License for more details.
> #
> # You should have received a copy of the GNU General Public License
> # along with this program; if not, write to the Free Software
> # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301, USA.
>
> use strict;
> use warnings;
> use Time::Local;
> use Getopt::Long;
>
> # ft_aggregate aggregates multiple files into a single entry
>
> ########################################################################
> #####
> # USAGE
>
> my $usage = <<END_USAGE;
> Usage: $0 [-i <interval>] -l 'level'
>
> Aggregates ft data on the fly.
>
> Options:
> -i interval The resolution in seconds to aggregate over Specifying
> an and interval of 0 will cause all timestamps to be
> set to 0. Defaults to 300 secs
>
> -g Guesses protocol spoken from a list of known ports and
> replaces the 'random' port field with 0
> -k <file> Specify file name containing 'known' ports file.
>
> -z <0-9> Compression level for output file
>
> -l level Specifies aggrgation level. It is presented as the
> concatenation of one or more of the following
> characters:
>
> a - replaces source IP address with '0.0.0.0'
> A - replaces last octet of source IP address with 0
> b - replaces destination IP address with '0.0.0.0'
> B - replaces last octet of destination IP address with 0
> c - replaces source port as 0
> d - replaces destinatin port as 0
> e - replaces ip protocol as 0
> N - non-fx related records
>
> END_USAGE
>
> ########################################################################
> #####
> # Constants
>
> my $export_test = 100_000; # How many records to read before
> # considering whether to flush the database
> my $max_records = 500_000; # Dump if we exceed this many records in the
> hash;
>
> ########################################################################
> #####
> # Read arguments and check syntax
>
> my $interval = 300;
> my $level = undef;
> my $guess = 0;
> my $compression = 0;
> my $help = 0;
> my $known_file = undef;
>
> GetOptions(
> "interval|i=i" => \$interval,
> "level|l=s" => \$level,
> "guess|g" => \$guess,
> "compress|z=i" => \$compression,
> "help|h" => \$help,
> "known|k" => \$known_file,
> ) or die "ERROR\n".$usage;
>
> $interval >= 30 or die "Interval must be at least 30
> seconds\n". $usage;
> defined $level or die "Level must be defined. Use -l '' for
> none.\n". $usage;
> $level =~ /^[aAbBcdeN]*$/ or die "Level parameter contains incorrect
> character\n". $usage;
> $compression =~/^\d$/ or die "Compression must be an integer from
> 0 to 9\n" . $usage;
> eval_aggregation_sub($level);
>
> my %known; # Global var hash of "known" TCP and UDP port numbers
> load_known_ports();
>
> if ($help){
> warn $usage;
> exit 0;
> }
>
> # If files are provided from the command line the open them with fcat
> # from STDIN. Silently ignore 0 byte files and ones not beginning with
> ft
> my $cmd;
> if (@ARGV) {
> my @ft_files = grep { -s } map { glob } @ARGV; # Extract non-empty
> ft* files.
> warn "$0: Found ",scalar(@ft_files)," ft files on the command line.
> flow-cat to be executed\n";
> $cmd = "/usr/local/netflow/bin/flow-cat -m "
> . join(" ",@ft_files)
> . " | /usr/local/netflow/bin/flow-export -f2 | ";
> } else {
> $cmd = "/usr/local/netflow/bin/flow-export -f2 | ";
> }
>
> my $out_cmd = "| /usr/local/netflow/bin/flow-import -V7 -f2
> -z$compression ";
>
> ########################################################################
> #####
> # Excute the flow export command to get CSV format data for each record.
>
> my %store; # The cache of aggregated data;
> open my $IN, $cmd or die "Cannot exec input command '$cmd'\n$!";
> warn "Executed input: $cmd\n";
> open my $OUT, $out_cmd or die "Cannot exec output command
> '$out_cmd'\n$!";
> warn "Executed output: $out_cmd\n";
>
> while(my $record = <$IN>) {
> next if substr($record,0,1) eq "#";
>
> #:unix_secs,unix_nsecs,sysuptime,exaddr,dpkts,doctets,first,last,engine_
> type,engine_id,srcaddr,dstaddr,nexthop,input,output,srcport,dstport,prot
> ,tos,tcp_flags,src_mask,dst_mask,src_as,dst_as,router_sc
>
> my @fields = split /,/, $record; # Split record into fields by
> comma.
> round_times( [EMAIL PROTECTED] ); # Round down time fields to the
> interval level.
> apply_agg( [EMAIL PROTECTED] ); # Apply aggregation to fields;
>
> my $key1 = $fields[0];
> my $key2 = join ",", @fields[ 1 .. 3 ]; # Questionable decision to
> separate
> my $key3 = join ",", @fields[ 6 .. 24 ];
>
> for ( $store{$key1}{$key2}{$key3} ) {
> $_->[0] += $fields[4]; # dpackets
> $_->[1] += $fields[5]; # doctets
> }
>
> if ($. % $export_test == 0 ) { # then we have
> my $rec_count;
> while (my ($key1, $key2_ref) = each %store) {
> while (my ($key2, $key3_ref) = each %$key2_ref) {
> $rec_count += keys %$key3_ref; # just count the keys
> }
> }
> if ( $rec_count > $max_records ) {
> warn "Warning. More than $rec_count unique entries in ".
> "aggregation hash. Exporting current stats\n";
> dump_store();
> }
> }
> }
> dump_store();
> close $IN or die "Cannot close pipe generated from $cmd\n$!";
> warn "Thats all folks\n";
> exit 0;
>
>
> sub dump_store {
> for my $key1 (sort {$a <=> $b} keys %store ) {
> while ( my ($key2, $k3_ref ) = each %{$store{$key1}} ) {
> while (my ($key3, $stats_ref) = each %$k3_ref ) {
> if ($stats_ref->[1] < 2**32 and $stats_ref->[0]>0 ) {
> # Less than 2**32 octets we can print this in one go
> print $OUT join(",", $key1, $key2, @$stats_ref,
> $key3);
> } else { # ($stats_ref->[1] > 2**32) # Only
> possibility
> # Too many octets we need to print multiple records
> my $dpackets = $stats_ref->[0];
> my $doctets = $stats_ref->[1];
> my $flows = int($doctets/ (2**32)); # we
> need to put out this many flows
> die "Coding logic error" if $flows < 1;
> warn "dOctets $doctets will wrap 2**32 bytes, we
> need to add $flows more records\n";
> my $doct_per_flow = 2**32 -1 ;
> my $dpkt_per_flow = (2**32 -1) / $doctets *
> $dpackets; # keep the same ratio
> for my $rec (1 .. $flows) {
> print $OUT join(",", $key1, $key2,
> $dpkt_per_flow, $doct_per_flow, $key3);
> $dpackets -= $dpkt_per_flow;
> $doctets -= $doct_per_flow;
> }
> print $OUT join(",", $key1,$key2,$dpackets,
> $doctets, $key3);
> }
> }
> }
> }
> %store =();
> }
>
> sub round_times {
> my $rec= shift;
> $rec->[0] -= ($rec->[0] % $interval) ; # Round down to nearest
> inetrval
> $rec->[1] = 0 ; # Ensure micro seconds
> is 0
> $rec->[2] -= ($rec->[2] % ($interval*100)); # Round down to nearest
> $interval centi seconds
> $rec->[6] -= ($rec->[6] % ($interval*100)); # Round down to nearest
> $interval centi seconds
> $rec->[7] -= ($rec->[7] % ($interval*100)); # Round down to nearest
> $interval centi seconds
> }
>
> sub eval_aggregation_sub {
> my $level = shift;
> my $level_code;
>
> $level_code = q( sub apply_agg { );
> $level_code.= q( my $rec=shift; );
> $level_code.= q( $rec->[10] ="0.0.0.0"; ) if
> $level=~/a/;
> $level_code.= q( $rec->[10] =~ s/\.\d+$/.0/; ) if
> $level=~/A/;
> $level_code.= q( $rec->[11] ="0.0.0.0"; ) if
> $level=~/b/;
> $level_code.= q( $rec->[11] =~ s/\.\d+$/.0/; ) if
> $level=~/B/;
> $level_code.= q( $rec->[15] ="0"; ) if
> $level=~/c/;
> $level_code.= q( $rec->[16] ="0"; ) if
> $level=~/d/;
> $level_code.= q( $rec->[17] ="0"; ) if
> $level=~/e/;
> $level_code.= q( $rec->[12] = "0.0.0.0";
> $rec->[24] = "0.0.0.0\n";
> @{$rec}[1,2,6..9,13,14,18..23]= (0)x14;) if
> $level=~/N/;
>
> $level_code.= q(
> $rec->[15] = 0 if (($known{$rec->[16]} or $rec->[16]< 1025)
> and $rec->[15] > 1024);
> $rec->[16] = 0 if (($known{$rec->[15]} or $rec->[15]< 1025)
> and $rec->[16] > 1024);
> ) if
> $guess;
>
> $level_code.= q( };1; );
>
> eval $level_code or die "Cannot eval\n$level_code\n$@ $!";
> }
>
> sub load_known_ports {
> my @raw;
> if ($known_file) {
> open my $IN, "<", $known_file or die "Cannot read $known_file
> for ports : $_";
> @raw=<$IN>;
> close $IN;
> } else {
> @raw=<DATA>;
> }
>
> # I used to add 1..1024 ports into the hash, but it is slower than a
> separate <1025 test;
> my @ports=(); # (1..1024);
>
> for (@raw){
> chomp;
> s/#.*//; # remove comments from record;
> push @ports, grep {$_+=0; $_>1024} split /[,\s]+/;
> }
> @ports or warn "$known_file did not contain any new ports\n";
> @[EMAIL PROTECTED](1) x @ports; # make hash keys;
> }
>
> __DATA__
> # Assorted AD listening ports
> 1025,1026,1031,1072,1168,2301,2381,2701,2702
> 3268,3269,3389,13722,13724,13782,13783,49400
>
> 1026, # MS Messenger
> 1066, # VoIP Control
> 1270, # MS Ops Manager
> 1420, # Timbuktu Mac thing
> 1433, # MS SQL Server
> 1477, # SNA Server
> 1478, # SNA Server
> 1494, # Citrix
> 2065, # DLSW
> 2067, # DLSW
> 3389, # Terminal services
> 6800, 6900, 9000, # VoIP traffic
> 8080, # Web Browsing
> 8999, # WebApps Stra
> 9100, # Printer PDL Data Stream
>
> **********************************************************************
> Registered Office:
> Marks and Spencer plc
> Waterside House
> 35 North Wharf Road
> London
> W2 1NW
>
> Registered No. 214436 in England and Wales.
>
> Telephone (020) 7935 4422
> Facsimile (020) 7487 2670
>
> <<www.marksandspencer.com>>
>
> Please note that electronic mail may be monitored.
>
> This e-mail is confidential. If you received it by mistake, please let us
> know and then delete it from your system; you should not copy, disclose, or
> distribute its contents to anyone nor act in reliance on this e-mail, as this
> is prohibited and may be unlawful.
> 2005
>
>
>
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools