OK, here's the files as they currently stand. The big note on this is that I have done very little testing, and most importantly I have NOT updated any of the plugins (see the async/* dir for those that need re-written to use AnyEvent, mostly to use AnyEvent::DNS. Also the tls plugin would need updated/rewritten, though it's probably a lot simpler with AnyEvent).

I'll be trying to do performance testing tomorrow (comparing with qpsmtpd-async, NOT any of the other models).

Matt.
#!/usr/bin/perl

use lib "./lib";
BEGIN {
    delete $ENV{ENV};
    delete $ENV{BASH_ENV};
    $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}

# Profiling - requires Devel::Profiler 0.05
#BEGIN { $Devel::Profiler::NO_INIT = 1; }
#use Devel::Profiler;

use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Qpsmtpd::AnyEvent;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
use List::Util qw(shuffle);
use Socket;
use AnyEvent::Socket;
use AnyEvent::Util;

$|++;

$SIG{'PIPE'} = "IGNORE";  # handled manually

$DEBUG          = 0;

my $PORT        = 2525;
my $LOCALADDR   = '0.0.0.0';
my $PROCS       = 1;
my $USER        = (getpwuid $>)[0];         # user to suid to
   $USER        = "smtpd" if $USER eq "root";
my $PAUSED      = 0;
my $NUMACCEPT   = 20;
my $PID_FILE    = '';
my $ACCEPT_RSET;
my $DETACH;       # daemonize on startup

# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;

sub reset_num_accept {
    $NUMACCEPT = 20;
}

sub help {
    print <<EOT;
Usage:
    qpsmtpd [OPTIONS]

Options:
 -l, --listen-address addr : listen on a specific address; default 0.0.0.0
 -p, --port P              : listen on a specific port; default 2525
 -u, --user U              : run as a particular user; defualt 'smtpd'
 -j, --procs J             : spawn J processes; default 1
 -d, --detach              : detach from controlling terminal (daemonize)
     --pid-file P          : print main servers PID to file P
 
 -h, --help                : this page
EOT
    exit(0);
}

GetOptions(
    'p|port=i'              => \$PORT,
    'l|listen-address=s'    => \$LOCALADDR,
    'j|procs=i'             => \$PROCS,
    'v|verbose+'              => \$DEBUG,
    'u|user=s'              => \$USER,
    'pid-file=s'            => \$PID_FILE,
    'd|detach'              => \$DETACH,
    'h|help'                => \&help,
) || help();

# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }

use constant READY      => 1;
use constant ACCEPTING  => 2;
use constant RESTARTING => 999;

my $CURRENT_PROCS = 0;
my %childstatus = ();
my $SERVER;

if ($PID_FILE && -r $PID_FILE) {
    open PID, "<$PID_FILE"
        or die "open_pidfile $PID_FILE: $!\n";
    my $running_pid = <PID> || ''; chomp $running_pid;
    if ($running_pid =~ /^(\d+)/) {
        if (kill 0, $running_pid) {
            die "Found an already running qpsmtpd with pid $running_pid.\n";
        }
    }
    close(PID);
}

run_as_server();

exit(0);

sub spawn_child {
    my ($plugin_loader) = @_;
    
    my $pid = fork;
    
    if ($pid) {
        $childstatus{$pid}++;
        $CURRENT_PROCS++;
        return $pid;
    }
    elsif (!defined($pid)) {
        die "Unable to fork";
    }
    
    $plugin_loader->run_hooks('post-fork');
    
    accept_loop();
}

sub accept_loop {
    my $w = AE::io $SERVER, 0, sub {
        print "Connect?\n";
        while ($SERVER && (my $peer = accept my $fh, $SERVER)) {
             fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside 
world does not
    
             my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $peer;
             my $qp = Qpsmtpd::AnyEvent->new($fh, format_address($host), 
$service);
             $qp->process_line("Connect");
          }
    };

    AnyEvent->condvar->wait;
    exit;
}

sub sig_hup {
    kill 1, keys %childstatus;
}

sub sig_chld {
    my $spawn_count = 0;
    while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
        if (!defined $childstatus{$child}) {
            next;
        }

        last unless $child > 0;
        print "SIGCHLD: child $child died\n";
        delete $childstatus{$child};
        $CURRENT_PROCS--;
    }
    
    $SIG{CHLD} = \&sig_chld;
}

sub HUNTSMAN {
    $SIG{CHLD} = 'DEFAULT';
    kill 'INT' => keys %childstatus;
    if ($PID_FILE && -e $PID_FILE) {
        unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!");
    }
    exit(0);
}

sub _connect_sock {
    my $ipn = parse_address($LOCALADDR) or die "cannot parse '$LOCALADDR' as 
host address";
    
    my $af = address_family $ipn;
    
    socket(my $sock, $af, SOCK_STREAM, 0) or die "socket: $!";
    setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, 1)
        or die "tcp_server/so_reuseaddr: $!";

    bind $sock, AnyEvent::Socket::pack_sockaddr($PORT, $ipn)
        or die "bind: $!";
    
    fh_nonblocking($sock, 1);
    listen($sock, 128);
    
    return $sock;
}

sub run_as_server {
    AnyEvent::detect;
    
    $SERVER = _connect_sock();

    # Drop priviledges
    my (undef, undef, $quid, $qgid) = getpwnam $USER or
          die "unable to determine uid/gid for $USER\n";
    my $groups = "$qgid $qgid";
    while (my (undef, undef, $gid, $members) = getgrent) {
        my @m = split(/ /, $members);
        if (grep { $_ eq $USER } @m) {
            $groups .= " $gid";
        }
    }
    endgrent;
    $) = $groups;
    POSIX::setgid($qgid) or
          die "unable to change gid: $!\n";
    POSIX::setuid($quid) or
          die "unable to change uid: $!\n";
    $> = $quid;
    
    # Load plugins here
    my $plugin_loader = Qpsmtpd::SMTP->new();
    $plugin_loader->load_plugins;
    
    if ($DETACH) {
        open STDIN, '/dev/null' or die "/dev/null: $!";
        open STDOUT, '>/dev/null' or die "/dev/null: $!";
        open STDERR, '>&STDOUT' or die "open(stderr): $!";
        defined (my $pid = fork) or die "fork: $!";
        exit 0 if $pid;
        POSIX::setsid or die "setsid: $!";
    }

    if ($PID_FILE) {
        open PID, ">$PID_FILE" || die "$PID_FILE: $!";
        print PID $$,"\n";
        close PID;
    }
 
    $plugin_loader->log(LOGINFO, 'Running as user '.
        (getpwuid($>) || $>) .
        ', group '.
        (getgrgid($)) || $)));

    $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children");
    
    while (1) {
        if ($PROCS > 1) {
            $SIG{CHLD} = $SIG{HUP} = 'IGNORE';
            while ($CURRENT_PROCS < $PROCS) {
                spawn_child($plugin_loader);
            }
            $SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
            
            $SIG{CHLD} = \&sig_chld;
            $SIG{HUP}  = \&sig_hup;
            
            # multi-process mode, just sleep
            sleep;
            # got here because a signal received?
            print "Signal received...\n";
        }
        else {
            accept_loop;
        }
    }
}

sub log {
  my ($level,$message) = @_;
  # $level not used yet.  this is reimplemented from elsewhere anyway
  warn("$$ fd:? $message\n");
}
package Qpsmtpd::AnyEvent;

use base ('Qpsmtpd::SMTP');
use Qpsmtpd::Constants;
use Qpsmtpd::Address;
use Mail::Header;
use POSIX qw(strftime);
use Time::HiRes qw(time);
use AnyEvent;
use AnyEvent::DNS;
use AnyEvent::Handle;
use Socket;
use strict;

sub max_idle_time { 60 }
sub max_connect_time { 1200 }

sub input_sock {
    my $self = shift;
    @_ and die "Read only";
    $self->{handle}->fh;
}

sub new {
    my ($class, $fh, $host, $port) = @_;
    
    my $self = $class->SUPER::new();
    
    my $hdl = AnyEvent::Handle->new(
        fh => $fh,
        on_error => sub {
            my ($hdl, $fatal, $msg) = @_;
            warn "got error $msg\n";
            $hdl->destroy;
        }
    );
    
    $hdl->push_read(line => sub {
        my ($h, $line) = @_;
        $self->process_line($line);
    });
    
    $self->{start_time} = time;
    $self->{mode} = 'connect';
    $self->{handle} = $hdl;
    $self->{remote_host} = $host;
    $self->{remote_port} = $port;
    
    $self->load_plugins;
    $self->load_logging;

    my ($rc, @msg) = $self->run_hooks_no_respond("pre-connection");
    
    if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
        @msg = ("Sorry, try again later")
          unless @msg;
        $self->respond(451, @msg);
        $self->disconnect;
    }
    elsif ($rc == DENY || $rc == DENY_DISCONNECT) {
        @msg = ("Sorry, service not available for you")
          unless @msg;
        $self->respond(550, @msg);
        $self->disconnect;
    }
    
    $self->reset_for_next_message();
    $self->{mode} = 'connect';
    
    return $self;
}

sub uptime {
    my $self = shift;
    
    return (time() - $self->{start_time});
}

sub reset_for_next_message {
    my $self = shift;
    
    $self->{_commands} = {
        ehlo => 1,
        helo => 1,
        rset => 1,
        mail => 1,
        rcpt => 1,
        data => 1,
        help => 1,
        vrfy => 1,
        noop => 1,
        quit => 1,
        auth => 0, # disabled by default
    };
    $self->{mode} = 'cmd';
    $self->{_extras} = {};
}

sub respond {
    my $self = shift;
    my ($code, @messages) = @_;
    while (my $msg = shift @messages) {
        my $line = $code . (@messages ? "-" : " ") . $msg;
        $self->{handle}->push_write("$line\r\n");
    }
    return 1;
}

# there's no "pause" needed on AnyEvent, because we are only interested 
# in reading after we write something, which is handled in respond() above.
sub continue_read {}
sub pause_read {}

my %cmd_cache;

sub process_line {
    my $self = shift;
    my $line = shift || return;
    if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line\n"; }
    if ($self->{mode} eq 'cmd') {
        $line =~ s/\r?\n$//s;
        $self->connection->notes('original_string', $line);
        my ($cmd, @params) = split(/ +/, $line, 2);
        my $meth = lc($cmd);
        if (my $lookup = $cmd_cache{$meth} || $self->{_commands}->{$meth} && $self->can($meth)) {
            $cmd_cache{$meth} = $lookup;
            eval {
                $lookup->($self, @params);
            };
            if ($@) {
                my $error = $@;
                chomp($error);
                $self->log(LOGERROR, "Command Error: $error");
                $self->fault("command '$cmd' failed unexpectedly");
            }
        }
        else {
            # No such method - i.e. unrecognized command
            my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params);
        }
        $self->{handle}->push_read(line => sub { $self->process_line($_[1]) });
    }
    elsif ($self->{mode} eq 'connect') {
        $self->{mode} = 'cmd';
        # I've removed an eval{} from around this. It shouldn't ever die()
        # but if it does we're a bit screwed... Ah well :-)
        $self->start_conversation;
    }
    else {
        die "Unknown mode";
    }
    return;
}

sub disconnect {
    my $self = shift;
    $self->SUPER::disconnect(@_);
    $self->{handle}->destroy;
}

sub close {
    my $self = shift;
    $self->run_hooks_no_respond("post-connection");
    $self->connection->reset;
    $self->{handle}->destroy;
}

sub start_conversation {
    my $self = shift;
    
    my $conn = $self->connection;
    # set remote_host, remote_ip and remote_port
    my ($ip, $port) = $self->{remote_host}, $self->{remote_port};
    return $self->close() unless $ip;
    $conn->remote_ip($ip);
    $conn->remote_port($port);
    $conn->remote_info("[$ip]");
    
    my $pn = getsockname($self->{handle}->fh);
    my ($port, $iaddr) = Socket::sockaddr_in($pn);
    
    $self->{local_port} = $port;
    $self->{local_ip} = Socket::inet_ntoa($iaddr);

    $conn->local_ip($self->{local_ip});
    $conn->local_port($self->{local_port});
    
    print "Looking up IP: $ip (L: $self->{local_ip})\n";
    
    AnyEvent::DNS::reverse_lookup(
        $ip, sub {
            print "Reverse lookup returned: $_[0]\n";
            if (!...@_) {
                $conn->remote_info($conn->remote_host("DNSERROR"));
            }
            else {
                $conn->remote_info($conn->remote_host($_[0]));
            }
            $self->run_hooks("connect");
        }
    );
    
    return;
}

sub data {
    my $self = shift;
    
    my ($rc, $msg) = $self->run_hooks("data");
    return 1;
}

sub data_respond {
    my $self = shift;
    my ($rc, $msg) = @_;
    if ($rc == DONE) {
        return;
    }
    elsif ($rc == DENY) {
        $msg->[0] ||= "Message denied";
        $self->respond(554, @$msg);
        $self->reset_transaction();
        return;
    }
    elsif ($rc == DENYSOFT) {
        $msg->[0] ||= "Message denied temporarily";
        $self->respond(451, @$msg);
        $self->reset_transaction();
        return;
    } 
    elsif ($rc == DENY_DISCONNECT) {
        $msg->[0] ||= "Message denied";
        $self->respond(554, @$msg);
        $self->disconnect;
        return;
    }
    elsif ($rc == DENYSOFT_DISCONNECT) {
        $msg->[0] ||= "Message denied temporarily";
        $self->respond(451, @$msg);
        $self->disconnect;
        return;
    }
    return $self->respond(503, "MAIL first") unless $self->transaction->sender;
    return $self->respond(503, "RCPT first") unless $self->transaction->recipients;
    
    $self->{header_lines} = '';
    $self->{data_size} = 0;
    $self->{in_header} = 1;
    $self->{max_size} = ($self->config('databytes'))[0] || 0;
    
    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");

    $self->respond(354, "go ahead");

    # my $max_get = $self->{max_size} || 1048576;
    $self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
    return 1;
}

sub got_data {
    my ($self, $data, $eol) = @_;

    if ($data eq ".") {
        $self->end_of_data;
        return;
    }
    
    # add a transaction->blocked check back here when we have line by line plugin access...
    unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
        $data =~ s/^\.\./\./mg;
        
        if ($self->{in_header}) {
            print "In Header: $data\n";
            if ($data eq '') {
                # end of headers
                $self->{in_header} = 0;
                
                # ... need to check that we don't reformat any of the received lines.
                #
                # 3.8.2 Received Lines in Gatewaying
                #   When forwarding a message into or out of the Internet environment, a
                #   gateway MUST prepend a Received: line, but it MUST NOT alter in any
                #   way a Received: line that is already in the header.
                my @header_lines = split(/^/m, $self->{header_lines});
    
                my $header = Mail::Header->new(\...@header_lines,
                                                Modify => 0, MailFrom => "COERCE");
                $self->transaction->header($header);
                $self->{header_lines} = '';

                # FIXME - call plugins to work on just the header here; can
                # save us buffering the mail content.
            
                # Save the start of just the body itself
                $self->transaction->set_body_start();
            }
            else {
                $self->{header_lines} .= $data . $eol;
            }
        }

        $data .= $eol; # append EOL
        $self->transaction->body_write($data);
        $self->{data_size} += length $data;
    }
    
    $self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
}

sub end_of_data {
    my $self = shift;
    
    #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
    
    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
    
    my $header = $self->transaction->header;
    if (!$header) {
        $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
        $self->transaction->header($header);
    }
    
    my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
    my $esmtp = substr($smtp,0,1) eq "E";
    my $authheader;
    my $sslheader;
    
    if (defined $self->connection->notes('tls_enabled')
            and $self->connection->notes('tls_enabled'))
    {
        $smtp .= "S" if $esmtp; # RFC3848
        $sslheader = "(".$self->connection->notes('tls_socket')->get_cipher()." encrypted) ";
    }
    
    if (defined $self->{_auth} and $self->{_auth} == OK) {
        $smtp .= "A" if $esmtp; # RFC3848
        $authheader = "(smtp-auth username $self->{_auth_user}, mechanism $self->{_auth_mechanism})\n";
    }
    
    $header->add("Received", $self->received_line($smtp, $authheader, $sslheader), 0);
    
    return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size};
    
    my ($rc, $msg) = $self->run_hooks("data_post");
    return 1;
}

1;

Reply via email to