Ram has uploaded a new change for review. https://gerrit.wikimedia.org/r/62403
Change subject: Added notes in README on usage. ...................................................................... Added notes in README on usage. Added usage message, additional options for specifying output file name or output pipe command; trim whitespace from arguments; additional sanity checks on arguments. Change-Id: I91b2fc2251fe4acd912fd2b4cfccfbd2db55cd08 --- M README M src/multiplexor.c 2 files changed, 209 insertions(+), 27 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/udp-filters refs/changes/03/62403/1 diff --git a/README b/README index e3732b6..f8df37a 100644 --- a/README +++ b/README @@ -54,7 +54,7 @@ =Background= This new filter system replaces the old collection of filters written in C. -=Command line arguments= +=Command line arguments for udp-filter= The following is a list of valid command line parameters. Either --path or --domain are mandatory (you can use them both, the other command line parameters are optional: @@ -79,7 +79,7 @@ For options -u, -p and -c you can enter multiple values if you separate them by a comma. -=Examples= +=Examples for udp-filter= ./udp_filter -d en.wikipedia this will log all pageviews (depending @@ -125,5 +125,35 @@ ./udp_filter -i 71.190.22.0/24,2607:f0d0:1002:51::/64 this will filter for logs with IP addresses that match the given CIDR ranges. IPv4 and IPv6 CIDR blocks are supported. +=Description of multiplexer= +The multiplexor (src/multiplexor.c) is a standalone program that creates a set of child +processes (all running the same command), then reads lines from stdin and writes each +line to one of the children in round-robin fashion. It is intended to read from processes +like udp2log that have limited buffering and tend to lose data if their output stream is not +read fast enough. So it provides 2 benefits to minimize data loss: +(a) Improves throughput when spare CPU capacity is available in additional cores. +(b) Provides additional buffering when transient spikes slow down a single child. + +=Command line arguments for multiplexer= +-cmd <cmd> -- command to run in subprocesses (default: none) +[-proc n] -- number of child processes to create (default: 2) +[-lines n] -- max. lines to process (default: 0 = infinite) +[-o <path>] -- path to output files (default: /var/tmp/multiplexor_) +[-p <cmd>] -- path to output pipe command (default: none) +-cmd is required; only one of -o and -p is allowed + +=Examples for multiplexer= +The udp2log config file can contain lines like this: + +# create 2 children with output going to /var/tmp/data_* files; each child runs udp-filter with +# the given arguments +# +pipe 1 /usr/bin/multiplexor -o /var/tmp/data_ -proc 2 -cmd "/usr/bin/udp-filter -F '\t' -i 189.40.0.0/16" + +# create 4 children each running udp-filter; output from each is piped to another process +# running /usr/local/bin/foo which deals with the data as it sees fit. +# +pipe 1 /usr/bin/multiplexor -p /usr/local/bin/foo -proc 4 -cmd "/usr/bin/udp-filter -F '\t' -i 189.40.0.0/16" + Acknowledgements Many thanks to Roan Kattouw and Tim Starling for showing me the way around in C. diff --git a/src/multiplexor.c b/src/multiplexor.c index caa6801..67a6b1c 100644 --- a/src/multiplexor.c +++ b/src/multiplexor.c @@ -1,9 +1,10 @@ // a multiplexor that reads lines from stdin and round-robin them to a pool of processes // (the intent is to read as fast as possible so that the process at the other end of -// the pipe doesn't block +// the pipe doesn't block) // #include <stdio.h> #include <stdlib.h> +#include <ctype.h> #include <errno.h> #include <string.h> @@ -13,26 +14,105 @@ char buf[ LINE_BUF_SZ ], cmd_buf[ CMD_BUF_SZ ]; +// defaults +char const *const DEF_OUTPUT_FILE = "/var/tmp/multiplexor_"; +int const DEF_PROC = 2; +int const DEF_LINES = 0; // commandline arguments struct args { - int n_proc, n_lines; // no. child processes, no. lines to read (0 = infinite) - char const *cmd; + int n_proc; // no. of child processes + long n_lines; // no. of lines to read (0 = infinite) + char const + *o_path, // path to output file name + *p_path, // path to pipe command name + *cmd; // path to subprocess command name } args; + +// bit flags to record which options were seen +enum { N_PROC = 1, + N_LINES = (N_PROC << 1), + COMMAND = (N_LINES << 1), + OUTPUT_FILE = (COMMAND << 1), + OUTPUT_PIPE = (OUTPUT_FILE << 1) +}; + +void usage() { + fprintf( stderr, "Options:\n" + "-cmd <cmd> -- command to run in subprocesses (default: none)\n" + "[-proc n] -- number of child processes to create (default: %d)\n" + "[-lines n] -- max. lines to process (default: 0 = infinite)\n" + "[-o <path>] -- path to output files (default: %s)\n" + "[-p <cmd>] -- path to output pipe command (default: none)\n" + "Only one of -o and -p is allowed; -cmd is required\n", + DEF_PROC, DEF_OUTPUT_FILE ); +} // usage + +// write address of malloc'ed copy of s to *d; the copy has whitespace at both ends removed. +// function returns size of new string; if this is zero, no malloc is done and *d is not +// modified +// +int trim( char const *const s, int const s_len, char const **d ) { + if ( ! s_len ) // s is empty + return 0; + + char const *p = s, *s_start = NULL; + + for ( ; ; ++p ) { // find first non-blank char + int const c = (0xff & *p); // mask off any sign extension + if ( ! c ) + break; // end of string + if ( isspace( c ) ) // skip white space + continue; + s_start = p++; // found beginning + break; + } + if ( NULL == s_start ) // s is blank + return 0; + + // s is not blank + p = s + s_len - 1; // pointer to last char of s + for ( ; ; --p ) { // find last non-blank char + int const c = *p & 0xff; // mask off any sign extension + if ( ! c ) { // null byte, should never happen + fprintf( stderr, "Unexpected null byte" ); + exit( 1 ); + } + if ( ! isspace( c ) ) // found end + break; + } + + int const len = p - s_start + 1, // length of new string + n_bytes = len + 1; // null byte + char *const dest = (char *)malloc( n_bytes ); + if ( NULL == dest ) { + fprintf( stderr, "malloc(%d) failed\n", n_bytes ); + exit( 1 ); + } + memcpy( dest, s_start, len ); + *d = dest; + return len; +} // trim void parse_args( int const argc, char *const *const argv ) { if ( 1 == argc ) { // no arguments, use defaults return; } - int k; - char const *arg; + long n; + int k, len, arg_flags = 0; + char const *arg, *path; for ( int i = 1; i < argc; ++i ) { arg = argv[ i ]; if ( !strcmp( "-proc", arg ) ) { + if ( N_PROC & arg_flags ) { + fprintf( stderr, "Duplicate -proc option\n" ); + exit( 1 ); + } + arg_flags |= N_PROC; ++i; if ( argc == i ) { - fprintf( stderr, "Missing argument after -proc\n" ); + fprintf( stderr, "Missing argument after -proc\n" ); usage(); exit( 1 ); } k = atoi( argv[ i ] ); @@ -46,36 +126,98 @@ } args.n_proc = k; } else if ( !strcmp( "-lines", arg ) ) { + if ( N_LINES & arg_flags ) { + fprintf( stderr, "Duplicate -lines option\n" ); + exit( 1 ); + } + arg_flags |= N_LINES; ++i; if ( argc == i ) { fprintf( stderr, "Missing argument after -lines\n" ); exit( 1 ); } - k = atoi( argv[ i ] ); - if ( k < 0 ) { - fprintf( stderr, "-lines argument too small: %d\n", k ); + n = atol( argv[ i ] ); + if ( n < 0 ) { + fprintf( stderr, "-lines argument too small: %ld\n", n ); exit( 1 ); } - args.n_lines = k; + args.n_lines = n; } else if ( !strcmp( "-cmd", arg ) ) { + if ( COMMAND & arg_flags ) { + fprintf( stderr, "Duplicate -cmd option\n" ); + exit( 1 ); + } + arg_flags |= COMMAND; ++i; if ( argc == i ) { fprintf( stderr, "Missing argument after -cmd\n" ); exit( 1 ); } - arg = argv[ i ]; - k = strlen( arg ); - if ( 0 == k ) { + path = argv[ i ]; + k = strlen( path ); + if ( ! k ) { fprintf( stderr, "-cmd argument empty\n" ); exit( 1 ); } - if ( k > CMD_BUF_SZ - 20 ) { - fprintf( stderr, "-cmd argument too long: %d\n", k ); + len = trim( path, k, &args.cmd ); + if ( ! len ) { + fprintf( stderr, "-cmd argument blank\n" ); exit( 1 ); } - args.cmd = arg; + } else if ( !strcmp( "-o", arg ) ) { + if ( OUTPUT_FILE & arg_flags ) { + fprintf( stderr, "Duplicate -o option\n" ); + exit( 1 ); + } + if ( OUTPUT_PIPE & arg_flags ) { + fprintf( stderr, "Cannot use both -p and -o options\n" ); + exit( 1 ); + } + arg_flags |= OUTPUT_FILE; + ++i; + if ( argc == i ) { + fprintf( stderr, "Missing argument after -o\n" ); usage(); + exit( 1 ); + } + path = argv[ i ]; + k = strlen( path ); + if ( ! k ) { + fprintf( stderr, "-o argument empty\n" ); + exit( 1 ); + } + len = trim( path, k, &args.o_path ); + if ( ! len ) { + fprintf( stderr, "-o argument blank\n" ); + exit( 1 ); + } + } else if ( !strcmp( "-p", arg ) ) { + if ( OUTPUT_PIPE & arg_flags ) { + fprintf( stderr, "Duplicate -p option\n" ); + exit( 1 ); + } + if ( OUTPUT_FILE & arg_flags ) { + fprintf( stderr, "Cannot use both -o and -p options\n" ); + exit( 1 ); + } + arg_flags |= OUTPUT_PIPE; + ++i; + if ( argc == i ) { + fprintf( stderr, "Missing argument after -p\n" ); usage(); + exit( 1 ); + } + path = argv[ i ]; + k = strlen( path ); + if ( ! k ) { + fprintf( stderr, "-p argument empty\n" ); + exit( 1 ); + } + len = trim( path, k, &args.p_path ); + if ( ! len ) { + fprintf( stderr, "-p argument blank\n" ); + exit( 1 ); + } } else { - fprintf( stderr, "Unknown argument: %s\n", arg ); + fprintf( stderr, "Unknown argument: %s\n", arg ); usage(); exit( 1 ); } } // while @@ -85,17 +227,22 @@ fprintf( stderr, "Missing -cmd option\n" ); exit( 1 ); } - printf( "n_proc = %d, n_lines = %d\ncmd = %s\n", - args.n_proc, args.n_lines, args.cmd ); + printf( "n_proc = %d, n_lines = %ld\ncommand = %s\no_path = %s\np_path = %s\n", + args.n_proc, args.n_lines, args.cmd, args.o_path, args.p_path ); } // parse_args -int main( int argc, char **argv ) { // needs no arguments +int main( int argc, char **argv ) { // defaults - args.n_proc = 2; - args.n_lines = 0; // infinite + args.n_proc = DEF_PROC; + args.n_lines = DEF_LINES; // infinite args.cmd = NULL; parse_args( argc, argv ); + + // set default output file name if necessary + if ( !(args.o_path || args.p_path) ) { + args.o_path = DEF_OUTPUT_FILE; + } // allocate space for output file pointers int const nbytes = args.n_proc * sizeof( FILE * ); @@ -110,8 +257,13 @@ FILE *const *const optr_end = optr + args.n_proc; for ( int i = 0; optr < optr_end; ++i, ++optr ) { // create child process - sprintf( cmd_buf, "%s >> out_%d.log", args.cmd, i ); - printf( "cmd = %s\n", cmd_buf ); + if ( args.o_path ) { // file path + sprintf( cmd_buf, "%s >> %s_%d.txt", args.cmd, args.o_path, i ); + } else { // pipe command path + sprintf( cmd_buf, "%s | %s", args.cmd, args.p_path ); + } + //printf( "cmd = %s\n", cmd_buf ); + errno = 0; // popen() does not set this for some errors *optr = popen( cmd_buf, "w" ); if ( NULL == *optr ) { @@ -122,7 +274,7 @@ exit( 1 ); } } - printf( "Created %d children\n", args.n_proc ); + //printf( "Created %d children\n", args.n_proc ); // multiplex data optr = ofiles; -- To view, visit https://gerrit.wikimedia.org/r/62403 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I91b2fc2251fe4acd912fd2b4cfccfbd2db55cd08 Gerrit-PatchSet: 1 Gerrit-Project: analytics/udp-filters Gerrit-Branch: master Gerrit-Owner: Ram <r...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits