Ulisses,
> I'm interested in capturing information from a netflow v5
> flow, but I would like aggregate information based on nework
> prefixes or AS.
>
> Is this possible with flow-tools? If not, what tool do you suggest?
>
> I've tried with flow-export | flow-import but I could not achieve it
> that way.
Here's my version of flow-aggregate which is just a big munge of
flow-export | magic | flow-import. It doesn't do AS aggregation and the
subnet mask manipulation could be greatly improved. I post it to inspire
you as to how to solve your particular problem rather than as a
ready-made solution.
Cheers
Alistair
__CODE__
#!/usr/bin/perl
# Copyright (C) 2005 Alistair McGlinchy
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
use strict;
use warnings;
use Time::Local;
use Getopt::Long;
# ft_aggregate aggregates multiple files into a single entry
########################################################################
#####
# USAGE
my $usage = <<END_USAGE;
Usage: $0 [-i <interval>] -l 'level'
Aggregates ft data on the fly.
Options:
-i interval The resolution in seconds to aggregate over Specifying
an and interval of 0 will cause all timestamps to be
set to 0. Defaults to 300 secs
-g Guesses protocol spoken from a list of known ports and
replaces the 'random' port field with 0
-k <file> Specify file name containing 'known' ports file.
-z <0-9> Compression level for output file
-l level Specifies aggrgation level. It is presented as the
concatenation of one or more of the following
characters:
a - replaces source IP address with '0.0.0.0'
A - replaces last octet of source IP address with 0
b - replaces destination IP address with '0.0.0.0'
B - replaces last octet of destination IP address with 0
c - replaces source port as 0
d - replaces destinatin port as 0
e - replaces ip protocol as 0
N - non-fx related records
END_USAGE
########################################################################
#####
# Constants
my $export_test = 100_000; # How many records to read before
# considering whether to flush the database
my $max_records = 500_000; # Dump if we exceed this many records in the
hash;
########################################################################
#####
# Read arguments and check syntax
my $interval = 300;
my $level = undef;
my $guess = 0;
my $compression = 0;
my $help = 0;
my $known_file = undef;
GetOptions(
"interval|i=i" => \$interval,
"level|l=s" => \$level,
"guess|g" => \$guess,
"compress|z=i" => \$compression,
"help|h" => \$help,
"known|k" => \$known_file,
) or die "ERROR\n".$usage;
$interval >= 30 or die "Interval must be at least 30
seconds\n". $usage;
defined $level or die "Level must be defined. Use -l '' for
none.\n". $usage;
$level =~ /^[aAbBcdeN]*$/ or die "Level parameter contains incorrect
character\n". $usage;
$compression =~/^\d$/ or die "Compression must be an integer from
0 to 9\n" . $usage;
eval_aggregation_sub($level);
my %known; # Global var hash of "known" TCP and UDP port numbers
load_known_ports();
if ($help){
warn $usage;
exit 0;
}
# If files are provided from the command line the open them with fcat
# from STDIN. Silently ignore 0 byte files and ones not beginning with
ft
my $cmd;
if (@ARGV) {
my @ft_files = grep { -s } map { glob } @ARGV; # Extract non-empty
ft* files.
warn "$0: Found ",scalar(@ft_files)," ft files on the command line.
flow-cat to be executed\n";
$cmd = "/usr/local/netflow/bin/flow-cat -m "
. join(" ",@ft_files)
. " | /usr/local/netflow/bin/flow-export -f2 | ";
} else {
$cmd = "/usr/local/netflow/bin/flow-export -f2 | ";
}
my $out_cmd = "| /usr/local/netflow/bin/flow-import -V7 -f2
-z$compression ";
########################################################################
#####
# Excute the flow export command to get CSV format data for each record.
my %store; # The cache of aggregated data;
open my $IN, $cmd or die "Cannot exec input command '$cmd'\n$!";
warn "Executed input: $cmd\n";
open my $OUT, $out_cmd or die "Cannot exec output command
'$out_cmd'\n$!";
warn "Executed output: $out_cmd\n";
while(my $record = <$IN>) {
next if substr($record,0,1) eq "#";
#:unix_secs,unix_nsecs,sysuptime,exaddr,dpkts,doctets,first,last,engine_
type,engine_id,srcaddr,dstaddr,nexthop,input,output,srcport,dstport,prot
,tos,tcp_flags,src_mask,dst_mask,src_as,dst_as,router_sc
my @fields = split /,/, $record; # Split record into fields by
comma.
round_times( [EMAIL PROTECTED] ); # Round down time fields to the
interval level.
apply_agg( [EMAIL PROTECTED] ); # Apply aggregation to fields;
my $key1 = $fields[0];
my $key2 = join ",", @fields[ 1 .. 3 ]; # Questionable decision to
separate
my $key3 = join ",", @fields[ 6 .. 24 ];
for ( $store{$key1}{$key2}{$key3} ) {
$_->[0] += $fields[4]; # dpackets
$_->[1] += $fields[5]; # doctets
}
if ($. % $export_test == 0 ) { # then we have
my $rec_count;
while (my ($key1, $key2_ref) = each %store) {
while (my ($key2, $key3_ref) = each %$key2_ref) {
$rec_count += keys %$key3_ref; # just count the keys
}
}
if ( $rec_count > $max_records ) {
warn "Warning. More than $rec_count unique entries in ".
"aggregation hash. Exporting current stats\n";
dump_store();
}
}
}
dump_store();
close $IN or die "Cannot close pipe generated from $cmd\n$!";
warn "Thats all folks\n";
exit 0;
sub dump_store {
for my $key1 (sort {$a <=> $b} keys %store ) {
while ( my ($key2, $k3_ref ) = each %{$store{$key1}} ) {
while (my ($key3, $stats_ref) = each %$k3_ref ) {
if ($stats_ref->[1] < 2**32 and $stats_ref->[0]>0 ) {
# Less than 2**32 octets we can print this in one go
print $OUT join(",", $key1, $key2, @$stats_ref,
$key3);
} else { # ($stats_ref->[1] > 2**32) # Only
possibility
# Too many octets we need to print multiple records
my $dpackets = $stats_ref->[0];
my $doctets = $stats_ref->[1];
my $flows = int($doctets/ (2**32)); # we
need to put out this many flows
die "Coding logic error" if $flows < 1;
warn "dOctets $doctets will wrap 2**32 bytes, we
need to add $flows more records\n";
my $doct_per_flow = 2**32 -1 ;
my $dpkt_per_flow = (2**32 -1) / $doctets *
$dpackets; # keep the same ratio
for my $rec (1 .. $flows) {
print $OUT join(",", $key1, $key2,
$dpkt_per_flow, $doct_per_flow, $key3);
$dpackets -= $dpkt_per_flow;
$doctets -= $doct_per_flow;
}
print $OUT join(",", $key1,$key2,$dpackets,
$doctets, $key3);
}
}
}
}
%store =();
}
sub round_times {
my $rec= shift;
$rec->[0] -= ($rec->[0] % $interval) ; # Round down to nearest
inetrval
$rec->[1] = 0 ; # Ensure micro seconds
is 0
$rec->[2] -= ($rec->[2] % ($interval*100)); # Round down to nearest
$interval centi seconds
$rec->[6] -= ($rec->[6] % ($interval*100)); # Round down to nearest
$interval centi seconds
$rec->[7] -= ($rec->[7] % ($interval*100)); # Round down to nearest
$interval centi seconds
}
sub eval_aggregation_sub {
my $level = shift;
my $level_code;
$level_code = q( sub apply_agg { );
$level_code.= q( my $rec=shift; );
$level_code.= q( $rec->[10] ="0.0.0.0"; ) if
$level=~/a/;
$level_code.= q( $rec->[10] =~ s/\.\d+$/.0/; ) if
$level=~/A/;
$level_code.= q( $rec->[11] ="0.0.0.0"; ) if
$level=~/b/;
$level_code.= q( $rec->[11] =~ s/\.\d+$/.0/; ) if
$level=~/B/;
$level_code.= q( $rec->[15] ="0"; ) if
$level=~/c/;
$level_code.= q( $rec->[16] ="0"; ) if
$level=~/d/;
$level_code.= q( $rec->[17] ="0"; ) if
$level=~/e/;
$level_code.= q( $rec->[12] = "0.0.0.0";
$rec->[24] = "0.0.0.0\n";
@{$rec}[1,2,6..9,13,14,18..23]= (0)x14;) if
$level=~/N/;
$level_code.= q(
$rec->[15] = 0 if (($known{$rec->[16]} or $rec->[16]< 1025)
and $rec->[15] > 1024);
$rec->[16] = 0 if (($known{$rec->[15]} or $rec->[15]< 1025)
and $rec->[16] > 1024);
) if
$guess;
$level_code.= q( };1; );
eval $level_code or die "Cannot eval\n$level_code\n$@ $!";
}
sub load_known_ports {
my @raw;
if ($known_file) {
open my $IN, "<", $known_file or die "Cannot read $known_file
for ports : $_";
@raw=<$IN>;
close $IN;
} else {
@raw=<DATA>;
}
# I used to add 1..1024 ports into the hash, but it is slower than a
separate <1025 test;
my @ports=(); # (1..1024);
for (@raw){
chomp;
s/#.*//; # remove comments from record;
push @ports, grep {$_+=0; $_>1024} split /[,\s]+/;
}
@ports or warn "$known_file did not contain any new ports\n";
@[EMAIL PROTECTED](1) x @ports; # make hash keys;
}
__DATA__
# Assorted AD listening ports
1025,1026,1031,1072,1168,2301,2381,2701,2702
3268,3269,3389,13722,13724,13782,13783,49400
1026, # MS Messenger
1066, # VoIP Control
1270, # MS Ops Manager
1420, # Timbuktu Mac thing
1433, # MS SQL Server
1477, # SNA Server
1478, # SNA Server
1494, # Citrix
2065, # DLSW
2067, # DLSW
3389, # Terminal services
6800, 6900, 9000, # VoIP traffic
8080, # Web Browsing
8999, # WebApps Stra
9100, # Printer PDL Data Stream
**********************************************************************
Registered Office:
Marks and Spencer plc
Waterside House
35 North Wharf Road
London
W2 1NW
Registered No. 214436 in England and Wales.
Telephone (020) 7935 4422
Facsimile (020) 7487 2670
<<www.marksandspencer.com>>
Please note that electronic mail may be monitored.
This e-mail is confidential. If you received it by mistake, please let us know
and then delete it from your system; you should not copy, disclose, or
distribute its contents to anyone nor act in reliance on this e-mail, as this
is prohibited and may be unlawful.
2005
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools