Ulisses,

> I'm interested in capturing information from a netflow v5
> flow, but I would like aggregate information based on nework 
> prefixes or AS.
> 
> Is this possible with flow-tools? If not, what tool do you suggest?
> 
> I've tried with flow-export | flow-import but I could not achieve it
> that way.

Here's my version of flow-aggregate which is just a big munge of
flow-export | magic | flow-import. It doesn't do AS aggregation and the
subnet mask manipulation could be greatly improved. I post it to inspire
you as to how to solve your particular problem rather than as a
ready-made solution.

Cheers

Alistair


__CODE__
#!/usr/bin/perl
# Copyright (C) 2005  Alistair McGlinchy
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.

use strict;
use warnings;
use Time::Local;
use Getopt::Long;

# ft_aggregate aggregates multiple files into a single entry

########################################################################
#####
# USAGE

my $usage = <<END_USAGE;
Usage: $0 [-i <interval>] -l 'level'

  Aggregates ft data on the fly.

  Options:
    -i interval  The resolution in seconds to aggregate over Specifying
                 an and interval of 0 will cause all timestamps to be
                 set to 0. Defaults to 300 secs

    -g           Guesses protocol spoken from a list of known ports and
                 replaces the 'random' port field with 0
    -k <file>    Specify file name containing 'known' ports file.

    -z <0-9>     Compression level for output file

    -l level     Specifies aggrgation level.  It is presented as the
                 concatenation of one  or more of the following
characters:

       a - replaces source IP address with '0.0.0.0'
       A - replaces last octet of source IP address with 0
       b - replaces destination IP address with '0.0.0.0'
       B - replaces last octet of destination IP address with 0
       c - replaces source port as 0
       d - replaces destinatin port as 0
       e - replaces ip protocol as 0
       N - non-fx related records

END_USAGE

########################################################################
#####
# Constants

my $export_test  = 100_000; # How many records to read before
                            # considering whether to flush the database
my $max_records  = 500_000; # Dump if we exceed this many records in the
hash;

########################################################################
#####
# Read arguments and check syntax

my $interval    = 300;
my $level       = undef;
my $guess       = 0;
my $compression = 0;
my $help        = 0;
my $known_file  = undef;

GetOptions(
    "interval|i=i"  => \$interval,
    "level|l=s"     => \$level,
    "guess|g"       => \$guess,
    "compress|z=i"  => \$compression,
    "help|h"        => \$help,
    "known|k"       => \$known_file,
) or die "ERROR\n".$usage;

$interval >= 30             or die "Interval must be at least 30
seconds\n".         $usage;
defined $level              or die "Level must be defined. Use -l '' for
none.\n".   $usage;
$level =~ /^[aAbBcdeN]*$/   or die "Level parameter contains incorrect
character\n". $usage;
$compression =~/^\d$/       or die "Compression must be an integer from
0 to 9\n"  . $usage;
eval_aggregation_sub($level);

my %known;  # Global var hash of "known" TCP and UDP port numbers 
load_known_ports();

if ($help){
    warn $usage;
    exit 0;
}

# If files are provided from the command line the open them with fcat 
# from STDIN. Silently ignore 0 byte files and ones not beginning with
ft
my $cmd;
if (@ARGV) {
    my @ft_files = grep { -s } map { glob }  @ARGV;  # Extract non-empty
ft* files.
    warn "$0: Found ",scalar(@ft_files)," ft files on the command line.
flow-cat to be executed\n";
    $cmd = "/usr/local/netflow/bin/flow-cat -m "
         . join(" ",@ft_files)
         . " | /usr/local/netflow/bin/flow-export -f2 | ";
} else {
    $cmd = "/usr/local/netflow/bin/flow-export -f2 | ";
}

my $out_cmd = "| /usr/local/netflow/bin/flow-import -V7 -f2
-z$compression ";

########################################################################
#####
# Excute the flow export command to get CSV format data for each record.

my %store; # The cache of aggregated data;
open my $IN,  $cmd     or die "Cannot exec input command '$cmd'\n$!";
warn "Executed input: $cmd\n";
open my $OUT, $out_cmd or die "Cannot exec output command
'$out_cmd'\n$!";
warn "Executed output: $out_cmd\n";

while(my $record = <$IN>) {
    next if substr($record,0,1) eq "#";
 
#:unix_secs,unix_nsecs,sysuptime,exaddr,dpkts,doctets,first,last,engine_
type,engine_id,srcaddr,dstaddr,nexthop,input,output,srcport,dstport,prot
,tos,tcp_flags,src_mask,dst_mask,src_as,dst_as,router_sc

    my @fields = split /,/, $record; # Split record into fields by
comma.
    round_times( [EMAIL PROTECTED] );         # Round down time fields to the
interval level.
    apply_agg(   [EMAIL PROTECTED] );         # Apply aggregation to fields;

    my $key1 = $fields[0];
    my $key2 = join ",", @fields[ 1 ..  3 ]; # Questionable decision to
separate
    my $key3 = join ",", @fields[ 6 .. 24 ];

    for ( $store{$key1}{$key2}{$key3} ) {
        $_->[0] += $fields[4]; # dpackets
        $_->[1] += $fields[5]; # doctets
    }

    if ($. % $export_test == 0 ) { # then we have
        my $rec_count;
        while (my ($key1, $key2_ref) = each %store) {
            while (my ($key2, $key3_ref) = each %$key2_ref) {
                $rec_count += keys %$key3_ref; # just count the keys
            }
        }
        if ( $rec_count > $max_records ) {
            warn "Warning. More than $rec_count unique entries in ".
                 "aggregation hash. Exporting current stats\n";
            dump_store();
        }
    }
}
dump_store();
close $IN or die "Cannot close pipe generated from $cmd\n$!";
warn "Thats all folks\n";
exit 0;


sub dump_store {
    for my $key1 (sort {$a <=> $b} keys %store ) {
        while (    my ($key2, $k3_ref   ) = each %{$store{$key1}} ) {
            while (my ($key3, $stats_ref) = each %$k3_ref         ) {
                if ($stats_ref->[1] < 2**32 and $stats_ref->[0]>0 ) {
                    # Less than 2**32 octets we can print this in one go
                    print $OUT join(",", $key1, $key2, @$stats_ref,
$key3);
                } else { #  ($stats_ref->[1] > 2**32)   # Only
possibility
                    # Too many octets we need to print multiple records
                    my $dpackets      = $stats_ref->[0];
                    my $doctets       = $stats_ref->[1];
                    my $flows         = int($doctets/ (2**32)); # we
need to put out this many flows
                    die "Coding logic error" if $flows     < 1;
                    warn "dOctets $doctets will wrap 2**32 bytes, we
need to add $flows more records\n";
                    my $doct_per_flow =  2**32 -1 ;
                    my $dpkt_per_flow = (2**32 -1) / $doctets *
$dpackets; # keep the same ratio
                    for my $rec (1 .. $flows) {
                        print $OUT join(",", $key1, $key2,
$dpkt_per_flow, $doct_per_flow, $key3);
                        $dpackets -= $dpkt_per_flow;
                        $doctets  -= $doct_per_flow;
                    }
                    print $OUT join(",", $key1,$key2,$dpackets,
$doctets, $key3);
                }
            }
        }
    }
    %store =();
}

sub round_times {
    my $rec= shift;
    $rec->[0] -= ($rec->[0] % $interval)      ;  # Round down to nearest
inetrval 
    $rec->[1]  = 0                            ;  # Ensure micro seconds
is 0
    $rec->[2] -= ($rec->[2] % ($interval*100));  # Round down to nearest
$interval centi seconds
    $rec->[6] -= ($rec->[6] % ($interval*100));  # Round down to nearest
$interval centi seconds
    $rec->[7] -= ($rec->[7] % ($interval*100));  # Round down to nearest
$interval centi seconds
}

sub eval_aggregation_sub {
    my $level = shift;
    my $level_code;

    $level_code = q( sub apply_agg {                            );
    $level_code.= q(     my $rec=shift;                         );
    $level_code.= q(     $rec->[10] ="0.0.0.0";                 ) if
$level=~/a/;
    $level_code.= q(     $rec->[10] =~ s/\.\d+$/.0/;            ) if
$level=~/A/;
    $level_code.= q(     $rec->[11] ="0.0.0.0";                 ) if
$level=~/b/;
    $level_code.= q(     $rec->[11] =~ s/\.\d+$/.0/;            ) if
$level=~/B/;
    $level_code.= q(     $rec->[15] ="0";                       ) if
$level=~/c/;
    $level_code.= q(     $rec->[16] ="0";                       ) if
$level=~/d/;
    $level_code.= q(     $rec->[17] ="0";                       ) if
$level=~/e/;
    $level_code.= q(     $rec->[12] = "0.0.0.0";
                         $rec->[24] = "0.0.0.0\n";
                         @{$rec}[1,2,6..9,13,14,18..23]= (0)x14;) if
$level=~/N/;

    $level_code.= q(
          $rec->[15] = 0 if (($known{$rec->[16]} or $rec->[16]< 1025)
and $rec->[15] > 1024);
          $rec->[16] = 0 if (($known{$rec->[15]} or $rec->[15]< 1025)
and $rec->[16] > 1024);
                                                                ) if
$guess;

    $level_code.= q( };1;                                       );

    eval $level_code or die "Cannot eval\n$level_code\n$@ $!";
}

sub load_known_ports {
    my @raw;
    if ($known_file) {
        open my $IN, "<", $known_file or die "Cannot read $known_file
for ports : $_";
        @raw=<$IN>;
       close $IN;
    } else {
        @raw=<DATA>;
    }

#    I used to add 1..1024 ports into the hash, but it is slower than a
separate <1025 test;
   my @ports=(); # (1..1024);

    for (@raw){
        chomp;
        s/#.*//; # remove comments from record;
        push @ports, grep {$_+=0; $_>1024} split /[,\s]+/;
    }
    @ports or warn "$known_file did not contain any new ports\n";
    @[EMAIL PROTECTED](1) x @ports; # make hash keys;
}

__DATA__
# Assorted AD listening ports
1025,1026,1031,1072,1168,2301,2381,2701,2702
3268,3269,3389,13722,13724,13782,13783,49400

1026, # MS Messenger
1066, # VoIP Control
1270, # MS Ops Manager
1420, # Timbuktu Mac thing
1433, # MS SQL Server
1477, # SNA Server
1478, # SNA Server
1494, # Citrix
2065, # DLSW
2067, # DLSW
3389, # Terminal services
6800, 6900, 9000, # VoIP traffic
8080, # Web Browsing
8999, # WebApps Stra
9100, # Printer PDL Data Stream

**********************************************************************
Registered Office:
Marks and Spencer plc
Waterside House
35 North Wharf Road
London
W2 1NW

Registered No. 214436 in England and Wales.

Telephone (020) 7935 4422
Facsimile (020) 7487 2670

<<www.marksandspencer.com>>

Please note that electronic mail may be monitored.

This e-mail is confidential. If you received it by mistake, please let us know 
and then delete it from your system; you should not copy, disclose, or 
distribute its contents to anyone nor act in reliance on this e-mail, as this 
is prohibited and may be unlawful.
2005


_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools

Reply via email to