Ram has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/62403


Change subject: Added notes in README on usage.
......................................................................

Added notes in README on usage.

Added usage message, additional options for specifying output file name
or output pipe command; trim whitespace from arguments; additional sanity
checks on arguments.

Change-Id: I91b2fc2251fe4acd912fd2b4cfccfbd2db55cd08
---
M README
M src/multiplexor.c
2 files changed, 209 insertions(+), 27 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/udp-filters 
refs/changes/03/62403/1

diff --git a/README b/README
index e3732b6..f8df37a 100644
--- a/README
+++ b/README
@@ -54,7 +54,7 @@
 =Background=
 This new filter system replaces the old collection of filters written in C. 
 
-=Command line arguments=
+=Command line arguments for udp-filter=
 The following is a list of valid command line parameters. 
 
 Either --path or --domain are mandatory (you can use them both, the other 
command line parameters are optional:
@@ -79,7 +79,7 @@
 For options -u, -p and -c you can enter multiple values if you separate them 
by 
 a comma.
 
-=Examples=
+=Examples for udp-filter=
 
 
 ./udp_filter -d en.wikipedia            this will log all pageviews (depending 
@@ -125,5 +125,35 @@
 
 ./udp_filter -i 71.190.22.0/24,2607:f0d0:1002:51::/64          this will 
filter for logs with IP addresses that match the given CIDR ranges.  IPv4 and 
IPv6 CIDR blocks are supported.
 
+=Description of multiplexer=
+The multiplexor (src/multiplexor.c) is a standalone program that creates a set 
of child
+processes (all running the same command), then reads lines from stdin and 
writes each
+line to one of the children in round-robin fashion. It is intended to read 
from processes
+like udp2log that have limited buffering and tend to lose data if their output 
stream is not
+read fast enough. So it provides 2 benefits to minimize data loss:
+(a) Improves throughput when spare CPU capacity is available in additional 
cores.
+(b) Provides additional buffering when transient spikes slow down a single 
child.
+
+=Command line arguments for multiplexer=
+-cmd <cmd>   -- command to run in subprocesses (default: none)
+[-proc n]    -- number of child processes to create (default: 2)
+[-lines n]   -- max. lines to process (default: 0 = infinite)
+[-o <path>]  -- path to output files (default: /var/tmp/multiplexor_)
+[-p <cmd>]   -- path to output pipe command (default: none)
+-cmd is required; only one of -o and -p is allowed
+
+=Examples for multiplexer=
+The udp2log config file can contain lines like this:
+
+# create 2 children with output going to /var/tmp/data_* files; each child 
runs udp-filter with
+# the given arguments
+#
+pipe 1 /usr/bin/multiplexor -o /var/tmp/data_ -proc 2 -cmd 
"/usr/bin/udp-filter -F '\t' -i 189.40.0.0/16"
+
+# create 4 children each running udp-filter; output from each is piped to 
another process
+# running /usr/local/bin/foo which deals with the data as it sees fit.
+#
+pipe 1 /usr/bin/multiplexor -p /usr/local/bin/foo -proc 4 -cmd 
"/usr/bin/udp-filter -F '\t' -i 189.40.0.0/16"
+
 Acknowledgements
 Many thanks to Roan Kattouw and Tim Starling for showing me the way around in 
C.
diff --git a/src/multiplexor.c b/src/multiplexor.c
index caa6801..67a6b1c 100644
--- a/src/multiplexor.c
+++ b/src/multiplexor.c
@@ -1,9 +1,10 @@
 // a multiplexor that reads lines from stdin and round-robin them to a pool of 
processes
 // (the intent is to read as fast as possible so that the process at the other 
end of
-// the pipe doesn't block
+// the pipe doesn't block)
 //
 #include <stdio.h>
 #include <stdlib.h>
+#include <ctype.h>
 #include <errno.h>
 #include <string.h>
 
@@ -13,26 +14,105 @@
 
 char buf[ LINE_BUF_SZ ], cmd_buf[ CMD_BUF_SZ ];
 
+// defaults
+char const *const DEF_OUTPUT_FILE = "/var/tmp/multiplexor_";
+int const DEF_PROC = 2;
+int const DEF_LINES = 0;
 
 // commandline arguments
 struct args {
-       int n_proc, n_lines;    // no. child processes, no. lines to read (0 = 
infinite)
-       char const *cmd;
+       int  n_proc;            // no. of child processes
+       long n_lines;           // no. of lines to read (0 = infinite)
+       char const
+               *o_path,     // path to output file name
+               *p_path,     // path to pipe command name
+               *cmd;        // path to subprocess command name
 } args;
+
+// bit flags to record which options were seen
+enum { N_PROC      = 1,
+       N_LINES     = (N_PROC << 1),
+       COMMAND     = (N_LINES << 1),
+       OUTPUT_FILE = (COMMAND << 1),
+       OUTPUT_PIPE = (OUTPUT_FILE << 1)
+};
+
+void usage() {
+       fprintf( stderr, "Options:\n"
+                "-cmd <cmd>   -- command to run in subprocesses (default: 
none)\n"
+                "[-proc n]    -- number of child processes to create (default: 
%d)\n"
+                "[-lines n]   -- max. lines to process (default: 0 = 
infinite)\n"
+                "[-o <path>]  -- path to output files (default: %s)\n"
+                "[-p <cmd>]   -- path to output pipe command (default: none)\n"
+                "Only one of -o and -p is allowed; -cmd is required\n",
+                DEF_PROC, DEF_OUTPUT_FILE );
+}  // usage
+
+// write address of malloc'ed copy of s to *d; the copy has whitespace at both 
ends removed.
+// function returns size of new string; if this is zero, no malloc is done and 
*d is not
+// modified
+//
+int trim( char const *const s, int const s_len, char const **d ) {
+       if ( ! s_len )       // s is empty
+               return 0;
+
+       char const *p = s, *s_start = NULL;
+
+       for ( ; ; ++p ) {    // find first non-blank char
+               int const c = (0xff & *p);    // mask off any sign extension
+               if ( ! c )
+                       break;                // end of string
+               if ( isspace( c ) )           // skip white space
+                       continue;
+               s_start = p++;                // found beginning
+               break;
+       }
+       if ( NULL == s_start )                // s is blank
+               return 0;
+
+       // s is not blank
+       p = s + s_len - 1;                    // pointer to last char of s
+       for ( ; ; --p ) {                     // find last non-blank char
+               int const c = *p & 0xff;      // mask off any sign extension
+               if ( ! c ) {                  // null byte, should never happen
+                       fprintf( stderr, "Unexpected null byte" );
+                       exit( 1 );
+               }
+               if ( ! isspace( c ) )         // found end
+                       break;
+       }
+
+       int const len     = p - s_start + 1,            // length of new string
+                 n_bytes = len + 1;                    // null byte
+       char *const dest = (char *)malloc( n_bytes );
+       if ( NULL == dest ) {
+               fprintf( stderr, "malloc(%d) failed\n", n_bytes );
+               exit( 1 );
+       }
+       memcpy( dest, s_start, len );
+       *d = dest;
+       return len;
+}  // trim
 
 void parse_args(  int const argc, char *const *const argv ) {
        if ( 1 == argc ) {    // no arguments, use defaults
                return;
        }
 
-       int k;
-       char const *arg;
+       long n;
+       int k, len, arg_flags = 0;
+       char const *arg, *path;
        for ( int i = 1; i < argc; ++i ) {
                arg = argv[ i ];
                if ( !strcmp( "-proc", arg ) ) {
+                       if ( N_PROC & arg_flags ) {
+                               fprintf( stderr, "Duplicate -proc option\n" );
+                               exit( 1 );
+                       } 
+                       arg_flags |= N_PROC;
                        ++i;
                        if ( argc == i ) {
-                               fprintf( stderr, "Missing argument after 
-proc\n" );
+                               fprintf( stderr, "Missing argument after 
-proc\n" ); usage();
                                exit( 1 );
                        }
                        k = atoi( argv[ i ] );
@@ -46,36 +126,98 @@
                        }
                        args.n_proc = k;
                } else if ( !strcmp( "-lines", arg ) ) {
+                       if ( N_LINES & arg_flags ) {
+                               fprintf( stderr, "Duplicate -lines option\n" );
+                               exit( 1 );
+                       } 
+                       arg_flags |= N_LINES;
                        ++i;
                        if ( argc == i ) {
                                fprintf( stderr, "Missing argument after 
-lines\n" );
                                exit( 1 );
                        }
-                       k = atoi( argv[ i ] );
-                       if ( k < 0 ) {
-                               fprintf( stderr, "-lines argument too small: 
%d\n", k );
+                       n = atol( argv[ i ] );
+                       if ( n < 0 ) {
+                               fprintf( stderr, "-lines argument too small: 
%ld\n", n );
                                exit( 1 );
                        }
-                       args.n_lines = k;
+                       args.n_lines = n;
                } else if ( !strcmp( "-cmd", arg ) ) {
+                       if ( COMMAND & arg_flags ) {
+                               fprintf( stderr, "Duplicate -cmd option\n" );
+                               exit( 1 );
+                       } 
+                       arg_flags |= COMMAND;
                        ++i;
                        if ( argc == i ) {
                                fprintf( stderr, "Missing argument after 
-cmd\n" );
                                exit( 1 );
                        }
-                       arg = argv[ i ];
-                       k = strlen( arg );
-                       if ( 0 == k ) {
+                       path = argv[ i ];
+                       k = strlen( path );
+                       if ( ! k ) {
                                fprintf( stderr, "-cmd argument empty\n" );
                                exit( 1 );
                        }
-                       if ( k > CMD_BUF_SZ - 20 ) {
-                               fprintf( stderr, "-cmd argument too long: 
%d\n", k );
+                       len = trim( path, k, &args.cmd );
+                       if ( ! len ) {
+                               fprintf( stderr, "-cmd argument blank\n" );
                                exit( 1 );
                        }
-                       args.cmd = arg;
+               } else if ( !strcmp( "-o", arg ) ) {
+                       if ( OUTPUT_FILE & arg_flags ) {
+                               fprintf( stderr, "Duplicate -o option\n" );
+                               exit( 1 );
+                       } 
+                       if ( OUTPUT_PIPE & arg_flags ) {
+                               fprintf( stderr, "Cannot use both -p and -o 
options\n" );
+                               exit( 1 );
+                       } 
+                       arg_flags |= OUTPUT_FILE;
+                       ++i;
+                       if ( argc == i ) {
+                               fprintf( stderr, "Missing argument after -o\n" 
); usage();
+                               exit( 1 );
+                       }
+                       path = argv[ i ];
+                       k = strlen( path );
+                       if ( ! k ) {
+                               fprintf( stderr, "-o argument empty\n" );
+                               exit( 1 );
+                       }
+                       len = trim( path, k, &args.o_path );
+                       if ( ! len ) {
+                               fprintf( stderr, "-o argument blank\n" );
+                               exit( 1 );
+                       }
+               } else if ( !strcmp( "-p", arg ) ) {
+                       if ( OUTPUT_PIPE & arg_flags ) {
+                               fprintf( stderr, "Duplicate -p option\n" );
+                               exit( 1 );
+                       } 
+                       if ( OUTPUT_FILE & arg_flags ) {
+                               fprintf( stderr, "Cannot use both -o and -p 
options\n" );
+                               exit( 1 );
+                       } 
+                       arg_flags |= OUTPUT_PIPE;
+                       ++i;
+                       if ( argc == i ) {
+                               fprintf( stderr, "Missing argument after -p\n" 
); usage();
+                               exit( 1 );
+                       }
+                       path = argv[ i ];
+                       k = strlen( path );
+                       if ( ! k ) {
+                               fprintf( stderr, "-p argument empty\n" );
+                               exit( 1 );
+                       }
+                       len = trim( path, k, &args.p_path );
+                       if ( ! len ) {
+                               fprintf( stderr, "-p argument blank\n" );
+                               exit( 1 );
+                       }
                } else {
-                       fprintf( stderr, "Unknown argument: %s\n", arg );
+                       fprintf( stderr, "Unknown argument: %s\n", arg ); 
usage();
                        exit( 1 );
                }
        }  // while
@@ -85,17 +227,22 @@
                fprintf( stderr, "Missing -cmd option\n" );
                exit( 1 );
        }
-       printf( "n_proc = %d, n_lines = %d\ncmd = %s\n",
-                args.n_proc, args.n_lines, args.cmd );
+       printf( "n_proc = %d, n_lines = %ld\ncommand = %s\no_path = %s\np_path 
= %s\n",
+               args.n_proc, args.n_lines, args.cmd, args.o_path, args.p_path );
 }  // parse_args
 
-int main( int argc, char **argv ) {    // needs no arguments
+int main( int argc, char **argv ) {
        // defaults
-       args.n_proc = 2;
-       args.n_lines = 0;  // infinite
+       args.n_proc = DEF_PROC;
+       args.n_lines = DEF_LINES;  // infinite
        args.cmd = NULL;
 
        parse_args( argc, argv );
+
+       // set default output file name if necessary
+       if ( !(args.o_path || args.p_path) ) {
+               args.o_path = DEF_OUTPUT_FILE;
+       }
 
        // allocate space for output file pointers
        int const nbytes = args.n_proc * sizeof( FILE * );
@@ -110,8 +257,13 @@
        FILE *const *const optr_end = optr + args.n_proc;
        for ( int i = 0; optr < optr_end; ++i, ++optr ) {
                // create child process
-               sprintf( cmd_buf, "%s >> out_%d.log", args.cmd, i );
-               printf( "cmd = %s\n", cmd_buf );
+               if ( args.o_path ) {    // file path
+                       sprintf( cmd_buf, "%s >> %s_%d.txt", args.cmd, 
args.o_path, i );
+               } else {                // pipe command path
+                       sprintf( cmd_buf, "%s | %s", args.cmd, args.p_path );
+               }
+               //printf( "cmd = %s\n", cmd_buf );
+
                 errno = 0;     // popen() does not set this for some errors
                *optr = popen( cmd_buf, "w" );
                if ( NULL == *optr ) {
@@ -122,7 +274,7 @@
                        exit( 1 );
                }
        }
-       printf( "Created %d children\n", args.n_proc );
+       //printf( "Created %d children\n", args.n_proc );
 
        // multiplex data
        optr = ofiles;

-- 
To view, visit https://gerrit.wikimedia.org/r/62403
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I91b2fc2251fe4acd912fd2b4cfccfbd2db55cd08
Gerrit-PatchSet: 1
Gerrit-Project: analytics/udp-filters
Gerrit-Branch: master
Gerrit-Owner: Ram <r...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to