Revision: 80 Author: matt Date: 2006-08-13 02:19:05 +0000 (Sun, 13 Aug 2006)
Log Message: ----------- Console server and various cleanups/fixes Modified Paths: -------------- trunk/lib/AxKit2/Config/Global.pm trunk/lib/AxKit2/Config.pm trunk/lib/AxKit2/Connection.pm trunk/lib/AxKit2/Processor.pm trunk/lib/AxKit2/Server.pm trunk/lib/AxKit2.pm Added Paths: ----------- trunk/lib/AxKit2/Console.pm Modified: trunk/lib/AxKit2/Config/Global.pm =================================================================== --- trunk/lib/AxKit2/Config/Global.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Config/Global.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -22,6 +22,18 @@ $self->{DocumentRoot}; } +sub console_port { + my $self = shift; + @_ and $self->{ConsolePort} = shift; + $self->{ConsolePort}; +} + +sub console_addr { + my $self = shift; + @_ and $self->{ConsoleAddr} = shift; + $self->{ConsoleAddr}; +} + sub styleroot { my $self = shift; @_ and $self->{StylesheetRoot} = shift; @@ -58,4 +70,4 @@ $self->{Notes}{$key}; } -1; \ No newline at end of file +1; Modified: trunk/lib/AxKit2/Config.pm =================================================================== --- trunk/lib/AxKit2/Config.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Config.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -14,8 +14,9 @@ Plugin => [\&TAKE1, sub { my $conf = shift; AxKit2::Client->load_plugin($conf, $_[0]); $conf->add_plugin($_[0]); }], Port => [\&TAKE1, sub { my $conf = shift; $conf->port($_[0]) }], DocumentRoot => [\&TAKE1, sub { my $conf = shift; $conf->docroot($_[0]) }], - StylesheetRoot => [\&TAKE1, sub { my $conf = shift; $conf->styleroot($_[0]) }], DirectoryIndex => [\&TAKE1, sub { my $conf = shift; $conf->dirindex($_[0]) }], + ConsolePort => [\&TAKE1, sub { my $conf = shift; $conf->isa('AxKit2::Config::Global') || die "ConsolePort only allowed at global level"; $conf->console_port($_[0]) }], + ConsoleAddr => [\&TAKE1, sub { my $conf = shift; $conf->isa('AxKit2::Config::Global') || die "ConsoleAddr only allowed at global level"; $conf->console_addr($_[0]) }], ); our $GLOBAL = AxKit2::Config::Global->new(); @@ -190,4 +191,4 @@ return @vals; } -1; \ No newline at end of file +1; Modified: trunk/lib/AxKit2/Connection.pm =================================================================== --- trunk/lib/AxKit2/Connection.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Connection.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -53,6 +53,12 @@ return $self; } +sub uptime { + my AxKit2::Connection $self = shift; + + return (time() - $self->{create_time}); +} + sub config { my AxKit2::Connection $self = shift; if ($self->{headers_in}) { @@ -246,6 +252,9 @@ return 1; } +sub DESTROY { +# print "Connection DESTROY\n"; +} # Cleanup routine to get rid of timed out sockets sub _do_cleanup { @@ -256,6 +265,8 @@ Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup); my $sf = __PACKAGE__->get_sock_ref; + + my $conns = 0; my %max_age; # classname -> max age (0 means forever) my %max_connect; # classname -> max connect time @@ -264,26 +275,24 @@ my AxKit2::Connection $v = $sf->{$k}; my $ref = ref $v; next unless $v->isa('AxKit2::Connection'); + $conns++; unless (defined $max_age{$ref}) { $max_age{$ref} = $ref->max_idle_time || 0; $max_connect{$ref} = $ref->max_connect_time || 0; } - AxKit2::Client->log(LOGDEBUG, "got a Connection. Max age: $max_age{$ref}, Max Connect: $max_connect{$ref}"); if (my $t = $max_connect{$ref}) { - AxKit2::Client->log(LOGDEBUG, "connection time: $v->{create_time} < " . ($now - $t)); if ($v->{create_time} < $now - $t) { push @to_close, $v; next; } } if (my $t = $max_age{$ref}) { - AxKit2::Client->log(LOGDEBUG, "alive time: $v->{alive_time} < " . ($now - $t)); if ($v->{alive_time} < $now - $t) { push @to_close, $v; } } } - + $_->close("Timeout") foreach @to_close; } Added: trunk/lib/AxKit2/Console.pm =================================================================== --- trunk/lib/AxKit2/Console.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Console.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -0,0 +1,314 @@ +package AxKit2::Console; + +use strict; +use warnings; + +use IO::Socket; +use AxKit2::Constants; +use Socket qw(IPPROTO_TCP TCP_NODELAY); + +use base 'Danga::Socket'; + +use fields qw( + alive_time + create_time + line + ); + +use constant CLEANUP_TIME => 5; # seconds + +our $PROMPT = "Enter command (or \"HELP\" for help)\n"; + +Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup); + +sub create { + my $class = shift; + my $config = shift; + + my $PORT = $config->console_port; + + return unless $PORT; + + my $sock = IO::Socket::INET->new( + LocalAddr => $config->console_addr || '127.0.0.1', + LocalPort => $PORT, + Proto => 'tcp', + Type => SOCK_STREAM, + Blocking => 0, + Reuse => 1, + Listen => SOMAXCONN ) + or die "Error creating server on port $PORT : [EMAIL PROTECTED]"; + + IO::Handle::blocking($sock, 0); + + my $accept_handler = sub { + my $csock = $sock->accept; + return unless $csock; + + if ($::DEBUG) { + AxKit2::Client->log(LOGDEBUG, "Listen child making a AxKit2::Connection for ", fileno($csock)); + } + + IO::Handle::blocking($csock, 0); + setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; + + if (my $client = eval { AxKit2::Console->new($csock, $config) }) { + $client->watch_read(1); + return; + } else { + die("Error creating new Console: $@") if $@; + } + }; + + Danga::Socket->AddOtherFds(fileno($sock) => $accept_handler); +} + +sub max_idle_time { 30 } +sub max_connect_time { 180 } +sub event_err { my AxKit2::Connection $self = shift; $self->close("Error") } +sub event_hup { my AxKit2::Connection $self = shift; $self->close("Disconnect (HUP)") } + +sub new { + my $self = shift; + my $sock = shift; + my $conf = shift; + $self = fields::new($self) unless ref($self); + + $self->SUPER::new($sock); + + my $now = time; + $self->{alive_time} = $self->{create_time} = $now; + $self->{line} = ''; + + $self->write($PROMPT); + + return $self; +} + +sub event_read { + my AxKit2::Console $self = shift; + $self->{alive_time} = time; + + my $bref = $self->read(8192); + return $self->close($!) unless defined $bref; + $self->process_read_buf($bref); +} + +sub process_read_buf { + my AxKit2::Console $self = shift; + my $bref = shift; + $self->{line} .= $$bref; + + while ($self->{line} =~ s/^(.*?\n)//) { + my $line = $1; + $self->process_line($line); + } +} + +sub process_line { + my AxKit2::Console $self = shift; + my $line = shift; + + $line =~ s/\r?\n//; + my ($cmd, @params) = split(/ +/, $line); + my $meth = "cmd_" . lc($cmd); + if (my $lookup = $self->can($meth)) { + $lookup->($self, @params); + $self->write($PROMPT); + } + else { + # No such method - i.e. unrecognized command + return $self->write("command '$cmd' unrecognised\n"); + } +} + +my %helptext; + +$helptext{help} = "HELP [CMD] - Get help on all commands or a specific command"; + +sub cmd_help { + my $self = shift; + my ($subcmd) = @_; + + $subcmd ||= 'help'; + $subcmd = lc($subcmd); + + if ($subcmd eq 'help') { + my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext)); + $self->write("Available Commands:\n\n$txt\n"); + } + my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list."; + $self->write("$txt\n"); +} + +$helptext{quit} = "QUIT - Exit the console"; +sub cmd_quit { + my $self = shift; + $self->close; +} + +$helptext{list} = "LIST [LIMIT] - List current connections, specify limit or negative limit to shrink list"; +sub cmd_list { + my $self = shift; + my ($count) = @_; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n"; + my @all; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("AxKit2::Connection")) { + next unless $pob->peer_addr_string; # haven't even started yet + push @all, [$pob+0, $pob->peer_addr_string, $pob->uptime]; + } + } + + @all = sort { $a->[2] <=> $b->[2] } @all; + if ($count) { + if ($count > 0) { + @all = @all[$#all-($count-1) .. $#all]; + } + else { + @all = @all[0..(abs($count) - 1)]; + } + } + foreach my $item (@all) { + $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item); + } + + $self->write( $list ); +} + +$helptext{kill} = "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF"; +sub cmd_kill { + my $self = shift; + my ($match) = @_; + + return $self->write("SYNTAX: KILL (\$IP | \$REF)\n") unless $match; + + my $descriptors = Danga::Socket->DescriptorMap; + + my $killed = 0; + my $is_ip = (index($match, '.') >= 0); + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("Qpsmtpd::PollServer")) { + if ($is_ip) { + next unless $pob->connection->remote_ip; # haven't even started yet + if ($pob->connection->remote_ip eq $match) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + else { + # match by ID + if ($pob+0 == hex($match)) { + $pob->write("550 Your connection has been killed by an administrator\r\n"); + $pob->disconnect; + $killed++; + } + } + } + } + + $self->write("Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n"); +} + +$helptext{dump} = "DUMP \$REF - Dump a connection using Data::Dumper"; +sub cmd_dump { + my $self = shift; + my ($ref) = @_; + + require Data::Dumper; + $Data::Dumper::Indent=1; + $Data::Dumper::Terse=1; + + my $descriptors = Danga::Socket->DescriptorMap; + foreach my $fd (keys %$descriptors) { + my $pob = $descriptors->{$fd}; + if ($pob->isa("AxKit2::Connection")) { + if ($pob+0 == hex($ref)) { + return $self->write( Data::Dumper::Dumper($pob) ); + } + } + } + + $self->write("Unable to find the connection: $ref. Try the LIST command\n"); +} + +sub DBI::FIRSTKEY {} + +$helptext{leaks} = "LEAKS [DUMP] - Run Devel::GC::Helper to list leaks with optional Dumper output"; +my %prev_leaks; +sub cmd_leaks { + my $self = shift; + my $dump = shift || ''; + $dump = (uc($dump) eq 'DUMP') ? 1 : 0; + + require Devel::GC::Helper; + if ($dump) { + require Data::Dumper; + $Data::Dumper::Terse = 1; + $Data::Dumper::Indent = 1; + #$Data::Dumper::Deparse = 1; + } + + my $pid = fork; + die "Can't fork" unless defined $pid; + return if $pid; + + # Child - run the leak sweep... + my $leaks = Devel::GC::Helper::sweep(); + foreach my $leak (@$leaks) { + $self->write("Leaked $leak\n"); + $self->write( Data::Dumper::Dumper($leak) ) if $dump; + } + $self->write( "Total leaks: " . scalar(@$leaks) . "\n"); + $self->write($PROMPT); + + exit; +} + +# Cleanup routine to get rid of timed out sockets +sub _do_cleanup { + my $now = time; + + # AxKit2::Client->log(LOGDEBUG, "do cleanup"); + + Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup); + + my $sf = __PACKAGE__->get_sock_ref; + + my $conns = 0; + + my %max_age; # classname -> max age (0 means forever) + my %max_connect; # classname -> max connect time + my @to_close; + while (my $k = each %$sf) { + my AxKit2::Connection $v = $sf->{$k}; + my $ref = ref $v; + next unless $v->isa('AxKit2::Console'); + $conns++; + unless (defined $max_age{$ref}) { + $max_age{$ref} = $ref->max_idle_time || 0; + $max_connect{$ref} = $ref->max_connect_time || 0; + } + if (my $t = $max_connect{$ref}) { + if ($v->{create_time} < $now - $t) { + push @to_close, $v; + next; + } + } + if (my $t = $max_age{$ref}) { + if ($v->{alive_time} < $now - $t) { + push @to_close, $v; + } + } + } + + $_->close("Timeout") foreach @to_close; +} + +1; Modified: trunk/lib/AxKit2/Processor.pm =================================================================== --- trunk/lib/AxKit2/Processor.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Processor.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -106,6 +106,7 @@ print $fh ($dom || $self->dom)->toString; } ($dom, $outfunc) = $trans->transform($pos++, $self); + # $trans->client(undef); $self->dom($dom); } Modified: trunk/lib/AxKit2/Server.pm =================================================================== --- trunk/lib/AxKit2/Server.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2/Server.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -9,15 +9,13 @@ use AxKit2::Constants; use AxKit2::Client; -our @servers; - sub create { my $class = shift; my $servconf = shift; my $PORT = $servconf->port; my $sock = IO::Socket::INET->new( - LocalPort => $servconf->port, + LocalPort => $PORT, Proto => 'tcp', Type => SOCK_STREAM, Blocking => 0, @@ -27,8 +25,6 @@ IO::Handle::blocking($sock, 0); - push @servers, $sock; - my $accept_handler = sub { my $csock = $sock->accept; return unless $csock; @@ -51,4 +47,4 @@ Danga::Socket->AddOtherFds(fileno($sock) => $accept_handler); } -1; \ No newline at end of file +1; Modified: trunk/lib/AxKit2.pm =================================================================== --- trunk/lib/AxKit2.pm 2006-08-13 02:18:36 UTC (rev 79) +++ trunk/lib/AxKit2.pm 2006-08-13 02:19:05 UTC (rev 80) @@ -5,6 +5,7 @@ use AxKit2::Client; use AxKit2::Server; use AxKit2::Config; +use AxKit2::Console; our $VERSION = '1.0'; @@ -16,6 +17,9 @@ local $SIG{'PIPE'} = "IGNORE"; # handled manually + # config server + AxKit2::Console->create(AxKit2::Config->global); + # setup server for my $server ($config->servers) { AxKit2::Server->create($server); @@ -24,4 +28,54 @@ Danga::Socket->EventLoop(); } -1; \ No newline at end of file +1; + +=head1 NAME + +AxKit2 - XML Application Server + +=head1 SYNOPSIS + +Just start the server: + + ./axkit + +To do anything complex read the documentation and start writing plugins. + +=head1 DESCRIPTION + +AxKit2 is the second generation XML Application Server following in the +footsteps of AxKit-1 (ONE). AxKit makes content generation easy by providing +powerful tools to push XML through stylesheets. This helps ensure your web +applications don't suffer from XSS bugs, and provides standardised templating +tools so that your template authors don't need to learn a new Perl templating +tool. + +=head1 PLUGINS + +Everything AxKit2 does is controlled by a plugin, and thus a lot of the +documentation for things that AxKit2 does is held within the plugin itself. + +=head1 Why 2.0? + +In creating AxKit2 the following goals were aimed for: + +=over 4 + +=item * Make it easier to setup and get started with than before. + +=item * Make it faster. + +=item * Make building complex web applications easier. + +=item * Make easy to extend and hack on. + +=item * Make complex pipelines and caching schemes easier. + +=back + +Many people wanted a straight port to Apache2/mod_perl2, so that they could +get their AxKit code migrated off the Apache1.x platform. This would have been +one route to go down, a route which we looked at very seriously. However it is +not the path we chose for a number of reasons which you can find in the AxKit +mailing list archives.