Hello

Thanks so much for your script, I think I'm going to use it and if I 
improve it I will not have any problem in contributing to the public
(I will send you/to tha mailing list the paches/modified version)

Thanks!

        Ulisses

On Thu, Feb 09, 2006 at 04:16:35PM -0000, McGlinchy, Alistair wrote:
> Ulisses,
> 
> > I'm interested in capturing information from a netflow v5
> > flow, but I would like aggregate information based on nework 
> > prefixes or AS.
> > 
> > Is this possible with flow-tools? If not, what tool do you suggest?
> > 
> > I've tried with flow-export | flow-import but I could not achieve it
> > that way.
> 
> Here's my version of flow-aggregate which is just a big munge of
> flow-export | magic | flow-import. It doesn't do AS aggregation and the
> subnet mask manipulation could be greatly improved. I post it to inspire
> you as to how to solve your particular problem rather than as a
> ready-made solution.
> 
> Cheers
> 
> Alistair
> 
> 
> __CODE__
> #!/usr/bin/perl
> # Copyright (C) 2005  Alistair McGlinchy
> # This program is free software; you can redistribute it and/or
> # modify it under the terms of the GNU General Public License
> # as published by the Free Software Foundation; either version 2
> # of the License, or (at your option) any later version.
> #
> # This program is distributed in the hope that it will be useful,
> # but WITHOUT ANY WARRANTY; without even the implied warranty of
> # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> # GNU General Public License for more details.
> #
> # You should have received a copy of the GNU General Public License
> # along with this program; if not, write to the Free Software
> # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301, USA.
> 
> use strict;
> use warnings;
> use Time::Local;
> use Getopt::Long;
> 
> # ft_aggregate aggregates multiple files into a single entry
> 
> ########################################################################
> #####
> # USAGE
> 
> my $usage = <<END_USAGE;
> Usage: $0 [-i <interval>] -l 'level'
> 
>   Aggregates ft data on the fly.
> 
>   Options:
>     -i interval  The resolution in seconds to aggregate over Specifying
>                  an and interval of 0 will cause all timestamps to be
>                  set to 0. Defaults to 300 secs
> 
>     -g           Guesses protocol spoken from a list of known ports and
>                  replaces the 'random' port field with 0
>     -k <file>    Specify file name containing 'known' ports file.
> 
>     -z <0-9>     Compression level for output file
> 
>     -l level     Specifies aggrgation level.  It is presented as the
>                  concatenation of one  or more of the following
> characters:
> 
>        a - replaces source IP address with '0.0.0.0'
>        A - replaces last octet of source IP address with 0
>        b - replaces destination IP address with '0.0.0.0'
>        B - replaces last octet of destination IP address with 0
>        c - replaces source port as 0
>        d - replaces destinatin port as 0
>        e - replaces ip protocol as 0
>        N - non-fx related records
> 
> END_USAGE
> 
> ########################################################################
> #####
> # Constants
> 
> my $export_test  = 100_000; # How many records to read before
>                             # considering whether to flush the database
> my $max_records  = 500_000; # Dump if we exceed this many records in the
> hash;
> 
> ########################################################################
> #####
> # Read arguments and check syntax
> 
> my $interval    = 300;
> my $level       = undef;
> my $guess       = 0;
> my $compression = 0;
> my $help        = 0;
> my $known_file  = undef;
> 
> GetOptions(
>     "interval|i=i"  => \$interval,
>     "level|l=s"     => \$level,
>     "guess|g"       => \$guess,
>     "compress|z=i"  => \$compression,
>     "help|h"        => \$help,
>     "known|k"       => \$known_file,
> ) or die "ERROR\n".$usage;
> 
> $interval >= 30             or die "Interval must be at least 30
> seconds\n".         $usage;
> defined $level              or die "Level must be defined. Use -l '' for
> none.\n".   $usage;
> $level =~ /^[aAbBcdeN]*$/   or die "Level parameter contains incorrect
> character\n". $usage;
> $compression =~/^\d$/       or die "Compression must be an integer from
> 0 to 9\n"  . $usage;
> eval_aggregation_sub($level);
> 
> my %known;  # Global var hash of "known" TCP and UDP port numbers 
> load_known_ports();
> 
> if ($help){
>     warn $usage;
>     exit 0;
> }
> 
> # If files are provided from the command line the open them with fcat 
> # from STDIN. Silently ignore 0 byte files and ones not beginning with
> ft
> my $cmd;
> if (@ARGV) {
>     my @ft_files = grep { -s } map { glob }  @ARGV;  # Extract non-empty
> ft* files.
>     warn "$0: Found ",scalar(@ft_files)," ft files on the command line.
> flow-cat to be executed\n";
>     $cmd = "/usr/local/netflow/bin/flow-cat -m "
>          . join(" ",@ft_files)
>          . " | /usr/local/netflow/bin/flow-export -f2 | ";
> } else {
>     $cmd = "/usr/local/netflow/bin/flow-export -f2 | ";
> }
> 
> my $out_cmd = "| /usr/local/netflow/bin/flow-import -V7 -f2
> -z$compression ";
> 
> ########################################################################
> #####
> # Excute the flow export command to get CSV format data for each record.
> 
> my %store; # The cache of aggregated data;
> open my $IN,  $cmd     or die "Cannot exec input command '$cmd'\n$!";
> warn "Executed input: $cmd\n";
> open my $OUT, $out_cmd or die "Cannot exec output command
> '$out_cmd'\n$!";
> warn "Executed output: $out_cmd\n";
> 
> while(my $record = <$IN>) {
>     next if substr($record,0,1) eq "#";
>  
> #:unix_secs,unix_nsecs,sysuptime,exaddr,dpkts,doctets,first,last,engine_
> type,engine_id,srcaddr,dstaddr,nexthop,input,output,srcport,dstport,prot
> ,tos,tcp_flags,src_mask,dst_mask,src_as,dst_as,router_sc
> 
>     my @fields = split /,/, $record; # Split record into fields by
> comma.
>     round_times( [EMAIL PROTECTED] );         # Round down time fields to the
> interval level.
>     apply_agg(   [EMAIL PROTECTED] );         # Apply aggregation to fields;
> 
>     my $key1 = $fields[0];
>     my $key2 = join ",", @fields[ 1 ..  3 ]; # Questionable decision to
> separate
>     my $key3 = join ",", @fields[ 6 .. 24 ];
> 
>     for ( $store{$key1}{$key2}{$key3} ) {
>         $_->[0] += $fields[4]; # dpackets
>         $_->[1] += $fields[5]; # doctets
>     }
> 
>     if ($. % $export_test == 0 ) { # then we have
>         my $rec_count;
>         while (my ($key1, $key2_ref) = each %store) {
>             while (my ($key2, $key3_ref) = each %$key2_ref) {
>                 $rec_count += keys %$key3_ref; # just count the keys
>             }
>         }
>         if ( $rec_count > $max_records ) {
>             warn "Warning. More than $rec_count unique entries in ".
>                  "aggregation hash. Exporting current stats\n";
>             dump_store();
>         }
>     }
> }
> dump_store();
> close $IN or die "Cannot close pipe generated from $cmd\n$!";
> warn "Thats all folks\n";
> exit 0;
> 
> 
> sub dump_store {
>     for my $key1 (sort {$a <=> $b} keys %store ) {
>         while (    my ($key2, $k3_ref   ) = each %{$store{$key1}} ) {
>             while (my ($key3, $stats_ref) = each %$k3_ref         ) {
>                 if ($stats_ref->[1] < 2**32 and $stats_ref->[0]>0 ) {
>                     # Less than 2**32 octets we can print this in one go
>                     print $OUT join(",", $key1, $key2, @$stats_ref,
> $key3);
>                 } else { #  ($stats_ref->[1] > 2**32)   # Only
> possibility
>                     # Too many octets we need to print multiple records
>                     my $dpackets      = $stats_ref->[0];
>                     my $doctets       = $stats_ref->[1];
>                     my $flows         = int($doctets/ (2**32)); # we
> need to put out this many flows
>                     die "Coding logic error" if $flows     < 1;
>                     warn "dOctets $doctets will wrap 2**32 bytes, we
> need to add $flows more records\n";
>                     my $doct_per_flow =  2**32 -1 ;
>                     my $dpkt_per_flow = (2**32 -1) / $doctets *
> $dpackets; # keep the same ratio
>                     for my $rec (1 .. $flows) {
>                         print $OUT join(",", $key1, $key2,
> $dpkt_per_flow, $doct_per_flow, $key3);
>                         $dpackets -= $dpkt_per_flow;
>                         $doctets  -= $doct_per_flow;
>                     }
>                     print $OUT join(",", $key1,$key2,$dpackets,
> $doctets, $key3);
>                 }
>             }
>         }
>     }
>     %store =();
> }
> 
> sub round_times {
>     my $rec= shift;
>     $rec->[0] -= ($rec->[0] % $interval)      ;  # Round down to nearest
> inetrval 
>     $rec->[1]  = 0                            ;  # Ensure micro seconds
> is 0
>     $rec->[2] -= ($rec->[2] % ($interval*100));  # Round down to nearest
> $interval centi seconds
>     $rec->[6] -= ($rec->[6] % ($interval*100));  # Round down to nearest
> $interval centi seconds
>     $rec->[7] -= ($rec->[7] % ($interval*100));  # Round down to nearest
> $interval centi seconds
> }
> 
> sub eval_aggregation_sub {
>     my $level = shift;
>     my $level_code;
> 
>     $level_code = q( sub apply_agg {                            );
>     $level_code.= q(     my $rec=shift;                         );
>     $level_code.= q(     $rec->[10] ="0.0.0.0";                 ) if
> $level=~/a/;
>     $level_code.= q(     $rec->[10] =~ s/\.\d+$/.0/;            ) if
> $level=~/A/;
>     $level_code.= q(     $rec->[11] ="0.0.0.0";                 ) if
> $level=~/b/;
>     $level_code.= q(     $rec->[11] =~ s/\.\d+$/.0/;            ) if
> $level=~/B/;
>     $level_code.= q(     $rec->[15] ="0";                       ) if
> $level=~/c/;
>     $level_code.= q(     $rec->[16] ="0";                       ) if
> $level=~/d/;
>     $level_code.= q(     $rec->[17] ="0";                       ) if
> $level=~/e/;
>     $level_code.= q(     $rec->[12] = "0.0.0.0";
>                          $rec->[24] = "0.0.0.0\n";
>                          @{$rec}[1,2,6..9,13,14,18..23]= (0)x14;) if
> $level=~/N/;
> 
>     $level_code.= q(
>           $rec->[15] = 0 if (($known{$rec->[16]} or $rec->[16]< 1025)
> and $rec->[15] > 1024);
>           $rec->[16] = 0 if (($known{$rec->[15]} or $rec->[15]< 1025)
> and $rec->[16] > 1024);
>                                                                 ) if
> $guess;
> 
>     $level_code.= q( };1;                                       );
> 
>     eval $level_code or die "Cannot eval\n$level_code\n$@ $!";
> }
> 
> sub load_known_ports {
>     my @raw;
>     if ($known_file) {
>         open my $IN, "<", $known_file or die "Cannot read $known_file
> for ports : $_";
>         @raw=<$IN>;
>        close $IN;
>     } else {
>         @raw=<DATA>;
>     }
> 
> #    I used to add 1..1024 ports into the hash, but it is slower than a
> separate <1025 test;
>    my @ports=(); # (1..1024);
> 
>     for (@raw){
>         chomp;
>         s/#.*//; # remove comments from record;
>         push @ports, grep {$_+=0; $_>1024} split /[,\s]+/;
>     }
>     @ports or warn "$known_file did not contain any new ports\n";
>     @[EMAIL PROTECTED](1) x @ports; # make hash keys;
> }
> 
> __DATA__
> # Assorted AD listening ports
> 1025,1026,1031,1072,1168,2301,2381,2701,2702
> 3268,3269,3389,13722,13724,13782,13783,49400
> 
> 1026, # MS Messenger
> 1066, # VoIP Control
> 1270, # MS Ops Manager
> 1420, # Timbuktu Mac thing
> 1433, # MS SQL Server
> 1477, # SNA Server
> 1478, # SNA Server
> 1494, # Citrix
> 2065, # DLSW
> 2067, # DLSW
> 3389, # Terminal services
> 6800, 6900, 9000, # VoIP traffic
> 8080, # Web Browsing
> 8999, # WebApps Stra
> 9100, # Printer PDL Data Stream
> 
> **********************************************************************
> Registered Office:
> Marks and Spencer plc
> Waterside House
> 35 North Wharf Road
> London
> W2 1NW
> 
> Registered No. 214436 in England and Wales.
> 
> Telephone (020) 7935 4422
> Facsimile (020) 7487 2670
> 
> <<www.marksandspencer.com>>
> 
> Please note that electronic mail may be monitored.
> 
> This e-mail is confidential. If you received it by mistake, please let us 
> know and then delete it from your system; you should not copy, disclose, or 
> distribute its contents to anyone nor act in reliance on this e-mail, as this 
> is prohibited and may be unlawful.
> 2005
> 
> 
> 
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools

Reply via email to