On Thu, 2 Dec 2010, Aleksandar Lazic wrote:

On Don 02.12.2010 19:04, Matt Sergeant wrote:
On Thu, 2 Dec 2010, Ask Bjørn Hansen wrote:


On Dec 2, 2010, at 10:37, Aleksandar Lazic wrote:

Maybe we can make another benchmark AnyEvent vs  Danga::Socket due to
the fact that AnyEvent with EV as underlaying event lib looks very fast
from the internet source ;-)

Matt was (I'm guessing) testing a load that's artificial to anyone who's not just archiving spam from a spam trap. For the rest of us by far most of the resources go to the various spam filtering stuff, so the performance will be fine.

More important is the ease of use of the APIs, the general eco system etc. On those AnyEvent wins (in my opinion).

Sorry yeah I think I have it working, but our work data centre is down
right now so I can't get at the files :-(

Will email the latest ones here when I get the chance.

Thanks, I'am quite interested.

OK, the relevant files are attached. Nothing else needed changing I don't think.

AnyEvent.pm has to go in the lib/Qpsmtpd/ dir.
#!/usr/bin/perl

use lib "./lib";
BEGIN {
    delete $ENV{ENV};
    delete $ENV{BASH_ENV};
    $ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}

# Profiling - requires Devel::Profiler 0.05
#BEGIN { $Devel::Profiler::NO_INIT = 1; }
#use Devel::Profiler;

use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Qpsmtpd::AnyEvent;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
use List::Util qw(shuffle);
use Socket;
use AnyEvent::Socket;
use AnyEvent::Util;

$|++;

$SIG{'PIPE'} = "IGNORE";  # handled manually

$DEBUG          = 0;

my $PORT        = 2525;
my $LOCALADDR   = '0.0.0.0';
my $PROCS       = 1;
my $USER        = (getpwuid $>)[0];         # user to suid to
   $USER        = "smtpd" if $USER eq "root";
my $PAUSED      = 0;
my $NUMACCEPT   = 20;
my $PID_FILE    = '';
my $ACCEPT_RSET;
my $DETACH;       # daemonize on startup

# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;

sub reset_num_accept {
    $NUMACCEPT = 20;
}

sub help {
    print <<EOT;
Usage:
    qpsmtpd [OPTIONS]

Options:
 -l, --listen-address addr : listen on a specific address; default 0.0.0.0
 -p, --port P              : listen on a specific port; default 2525
 -u, --user U              : run as a particular user; defualt 'smtpd'
 -j, --procs J             : spawn J processes; default 1
 -d, --detach              : detach from controlling terminal (daemonize)
     --pid-file P          : print main servers PID to file P
 
 -h, --help                : this page
EOT
    exit(0);
}

GetOptions(
    'p|port=i'              => \$PORT,
    'l|listen-address=s'    => \$LOCALADDR,
    'j|procs=i'             => \$PROCS,
    'v|verbose+'              => \$DEBUG,
    'u|user=s'              => \$USER,
    'pid-file=s'            => \$PID_FILE,
    'd|detach'              => \$DETACH,
    'h|help'                => \&help,
) || help();

# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }

use constant READY      => 1;
use constant ACCEPTING  => 2;
use constant RESTARTING => 999;

my $CURRENT_PROCS = 0;
my %childstatus = ();
my $SERVER;

if ($PID_FILE && -r $PID_FILE) {
    open PID, "<$PID_FILE"
        or die "open_pidfile $PID_FILE: $!\n";
    my $running_pid = <PID> || ''; chomp $running_pid;
    if ($running_pid =~ /^(\d+)/) {
        if (kill 0, $running_pid) {
            die "Found an already running qpsmtpd with pid $running_pid.\n";
        }
    }
    close(PID);
}

run_as_server();

exit(0);

sub spawn_child {
    my ($plugin_loader) = @_;
    
    my $pid = fork;
    
    if ($pid) {
        $childstatus{$pid}++;
        $CURRENT_PROCS++;
        return $pid;
    }
    elsif (!defined($pid)) {
        die "Unable to fork";
    }
    
    $plugin_loader->run_hooks('post-fork');
    
    accept_loop();
}

sub accept_loop {
    my $w = AE::io $SERVER, 0, sub {
        print "Connect?\n";
        while ($SERVER && (my $peer = accept my $fh, $SERVER)) {
             fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside 
world does not
    
             my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $peer;
             my $qp = Qpsmtpd::AnyEvent->new($fh, format_address($host), 
$service);
             $qp->process_line("Connect");
          }
    };

    AnyEvent->condvar->wait;
    exit;
}

sub sig_hup {
    kill 1, keys %childstatus;
}

sub sig_chld {
    my $spawn_count = 0;
    while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
        if (!defined $childstatus{$child}) {
            next;
        }

        last unless $child > 0;
        print "SIGCHLD: child $child died\n";
        delete $childstatus{$child};
        $CURRENT_PROCS--;
    }
    
    $SIG{CHLD} = \&sig_chld;
}

sub HUNTSMAN {
    $SIG{CHLD} = 'DEFAULT';
    kill 'INT' => keys %childstatus;
    if ($PID_FILE && -e $PID_FILE) {
        unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!");
    }
    exit(0);
}

sub _connect_sock {
    my $ipn = parse_address($LOCALADDR) or die "cannot parse '$LOCALADDR' as 
host address";
    
    my $af = address_family $ipn;
    
    socket(my $sock, $af, SOCK_STREAM, 0) or die "socket: $!";
    setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, 1)
        or die "tcp_server/so_reuseaddr: $!";

    bind $sock, AnyEvent::Socket::pack_sockaddr($PORT, $ipn)
        or die "bind: $!";
    
    fh_nonblocking($sock, 1);
    listen($sock, 128);
    
    return $sock;
}

sub run_as_server {
    AnyEvent::detect;
    
    $SERVER = _connect_sock();

    # Drop priviledges
    my (undef, undef, $quid, $qgid) = getpwnam $USER or
          die "unable to determine uid/gid for $USER\n";
    my $groups = "$qgid $qgid";
    while (my (undef, undef, $gid, $members) = getgrent) {
        my @m = split(/ /, $members);
        if (grep { $_ eq $USER } @m) {
            $groups .= " $gid";
        }
    }
    endgrent;
    $) = $groups;
    POSIX::setgid($qgid) or
          die "unable to change gid: $!\n";
    POSIX::setuid($quid) or
          die "unable to change uid: $!\n";
    $> = $quid;
    
    # Load plugins here
    my $plugin_loader = Qpsmtpd::SMTP->new();
    $plugin_loader->load_plugins;
    
    if ($DETACH) {
        open STDIN, '/dev/null' or die "/dev/null: $!";
        open STDOUT, '>/dev/null' or die "/dev/null: $!";
        open STDERR, '>&STDOUT' or die "open(stderr): $!";
        defined (my $pid = fork) or die "fork: $!";
        exit 0 if $pid;
        POSIX::setsid or die "setsid: $!";
    }

    if ($PID_FILE) {
        open PID, ">$PID_FILE" || die "$PID_FILE: $!";
        print PID $$,"\n";
        close PID;
    }
 
    $plugin_loader->log(LOGINFO, 'Running as user '.
        (getpwuid($>) || $>) .
        ', group '.
        (getgrgid($)) || $)));

    $plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children");
    
    while (1) {
        if ($PROCS > 1) {
            $SIG{CHLD} = $SIG{HUP} = 'IGNORE';
            while ($CURRENT_PROCS < $PROCS) {
                spawn_child($plugin_loader);
            }
            $SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
            
            $SIG{CHLD} = \&sig_chld;
            $SIG{HUP}  = \&sig_hup;
            
            # multi-process mode, just sleep
            sleep;
            # got here because a signal received?
            print "Signal received...\n";
        }
        else {
            accept_loop;
        }
    }
}

sub log {
  my ($level,$message) = @_;
  # $level not used yet.  this is reimplemented from elsewhere anyway
  warn("$$ fd:? $message\n");
}
package Qpsmtpd::AnyEvent;

use base ('Qpsmtpd::SMTP');
use Qpsmtpd::Constants;
use Qpsmtpd::Address;
use Mail::Header;
use POSIX qw(strftime);
use Time::HiRes qw(time);
use AnyEvent;
use AnyEvent::DNS;
use AnyEvent::Handle;
use Socket;
use strict;

sub max_idle_time { 60 }
sub max_connect_time { 1200 }

sub input_sock {
    my $self = shift;
    @_ and die "Read only";
    $self->{handle}->fh;
}

sub new {
    my ($class, $fh, $host, $port) = @_;
    
    my $self = $class->SUPER::new();
    
    my $hdl = AnyEvent::Handle->new(
        fh => $fh,
        on_error => sub {
            my ($hdl, $fatal, $msg) = @_;
            warn "got error $msg\n";
            $hdl->destroy;
        }
    );
    
    $hdl->push_read(line => sub {
        my ($h, $line) = @_;
        $self->process_line($line);
    });
    
    $self->{start_time} = time;
    $self->{mode} = 'connect';
    $self->{handle} = $hdl;
    $self->{remote_host} = $host;
    $self->{remote_port} = $port;
    
    $self->load_plugins;
    $self->load_logging;

    my ($rc, @msg) = $self->run_hooks_no_respond("pre-connection");
    
    if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
        @msg = ("Sorry, try again later")
          unless @msg;
        $self->respond(451, @msg);
        $self->disconnect;
    }
    elsif ($rc == DENY || $rc == DENY_DISCONNECT) {
        @msg = ("Sorry, service not available for you")
          unless @msg;
        $self->respond(550, @msg);
        $self->disconnect;
    }
    
    $self->reset_for_next_message();
    $self->{mode} = 'connect';
    
    return $self;
}

sub uptime {
    my $self = shift;
    
    return (time() - $self->{start_time});
}

sub reset_for_next_message {
    my $self = shift;
    
    $self->{_commands} = {
        ehlo => 1,
        helo => 1,
        rset => 1,
        mail => 1,
        rcpt => 1,
        data => 1,
        help => 1,
        vrfy => 1,
        noop => 1,
        quit => 1,
        auth => 0, # disabled by default
    };
    $self->{mode} = 'cmd';
    $self->{_extras} = {};
}

sub respond {
    my $self = shift;
    my ($code, @messages) = @_;
    while (my $msg = shift @messages) {
        my $line = $code . (@messages ? "-" : " ") . $msg;
        $self->{handle}->push_write("$line\r\n");
    }
    return 1;
}

# there's no "pause" needed on AnyEvent, because we are only interested 
# in reading after we write something, which is handled in respond() above.
sub continue_read {}
sub pause_read {}

my %cmd_cache;

sub process_line {
    my $self = shift;
    my $line = shift || return;
    if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line\n"; }
    if ($self->{mode} eq 'cmd') {
        $line =~ s/\r?\n$//s;
        $self->connection->notes('original_string', $line);
        my ($cmd, @params) = split(/ +/, $line, 2);
        my $meth = lc($cmd);
        if (my $lookup = $cmd_cache{$meth} || $self->{_commands}->{$meth} && $self->can($meth)) {
            $cmd_cache{$meth} = $lookup;
            eval {
                $lookup->($self, @params);
            };
            if ($@) {
                my $error = $@;
                chomp($error);
                $self->log(LOGERROR, "Command Error: $error");
                $self->fault("command '$cmd' failed unexpectedly");
            }
        }
        else {
            # No such method - i.e. unrecognized command
            my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params);
        }
        $self->{handle}->push_read(line => sub { $self->process_line($_[1]) });
    }
    elsif ($self->{mode} eq 'connect') {
        $self->{mode} = 'cmd';
        # I've removed an eval{} from around this. It shouldn't ever die()
        # but if it does we're a bit screwed... Ah well :-)
        $self->start_conversation;
    }
    else {
        die "Unknown mode";
    }
    return;
}

sub disconnect {
    my $self = shift;
    $self->SUPER::disconnect(@_);
    $self->{handle}->destroy;
}

sub close {
    my $self = shift;
    $self->run_hooks_no_respond("post-connection");
    $self->connection->reset;
    $self->{handle}->destroy;
}

sub start_conversation {
    my $self = shift;
    
    my $conn = $self->connection;
    # set remote_host, remote_ip and remote_port
    my ($ip, $port) = $self->{remote_host}, $self->{remote_port};
    return $self->close() unless $ip;
    $conn->remote_ip($ip);
    $conn->remote_port($port);
    $conn->remote_info("[$ip]");
    
    my $pn = getsockname($self->{handle}->fh);
    my ($port, $iaddr) = Socket::sockaddr_in($pn);
    
    $self->{local_port} = $port;
    $self->{local_ip} = Socket::inet_ntoa($iaddr);

    $conn->local_ip($self->{local_ip});
    $conn->local_port($self->{local_port});
    
    #print "Looking up IP: $ip (L: $self->{local_ip})\n";
    
    AnyEvent::DNS::reverse_lookup(
        $ip, sub {
            #print "Reverse lookup returned: $_[0]\n";
            if (!...@_) {
                $conn->remote_info($conn->remote_host("DNSERROR"));
            }
            else {
                $conn->remote_info($conn->remote_host($_[0]));
            }
            $self->run_hooks("connect");
        }
    );
    
    return;
}

sub data {
    my $self = shift;
    
    my ($rc, $msg) = $self->run_hooks("data");
    return 1;
}

sub data_respond {
    my $self = shift;
    my ($rc, $msg) = @_;
    if ($rc == DONE) {
        return;
    }
    elsif ($rc == DENY) {
        $msg->[0] ||= "Message denied";
        $self->respond(554, @$msg);
        $self->reset_transaction();
        return;
    }
    elsif ($rc == DENYSOFT) {
        $msg->[0] ||= "Message denied temporarily";
        $self->respond(451, @$msg);
        $self->reset_transaction();
        return;
    } 
    elsif ($rc == DENY_DISCONNECT) {
        $msg->[0] ||= "Message denied";
        $self->respond(554, @$msg);
        $self->disconnect;
        return;
    }
    elsif ($rc == DENYSOFT_DISCONNECT) {
        $msg->[0] ||= "Message denied temporarily";
        $self->respond(451, @$msg);
        $self->disconnect;
        return;
    }
    return $self->respond(503, "MAIL first") unless $self->transaction->sender;
    return $self->respond(503, "RCPT first") unless $self->transaction->recipients;
    
    $self->{header_lines} = '';
    $self->{data_size} = 0;
    $self->{in_header} = 1;
    $self->{max_size} = ($self->config('databytes'))[0] || 0;
    
    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");

    $self->respond(354, "go ahead");

    # my $max_get = $self->{max_size} || 1048576;
    $self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
    return 1;
}

sub got_data {
    my ($self, $data, $eol) = @_;

    if ($data eq ".") {
        $self->end_of_data;
        return;
    }
    
    # add a transaction->blocked check back here when we have line by line plugin access...
    unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
        $data =~ s/^\.\./\./mg;
        
        if ($self->{in_header}) {
            #print "In Header: $data\n";
            if ($data eq '') {
                # end of headers
                $self->{in_header} = 0;
                
                # ... need to check that we don't reformat any of the received lines.
                #
                # 3.8.2 Received Lines in Gatewaying
                #   When forwarding a message into or out of the Internet environment, a
                #   gateway MUST prepend a Received: line, but it MUST NOT alter in any
                #   way a Received: line that is already in the header.
                my @header_lines = split(/^/m, $self->{header_lines});
    
                my $header = Mail::Header->new(\...@header_lines,
                                                Modify => 0, MailFrom => "COERCE");
                $self->transaction->header($header);
                $self->{header_lines} = '';

                # FIXME - call plugins to work on just the header here; can
                # save us buffering the mail content.
            
                # Save the start of just the body itself
                $self->transaction->set_body_start();
            }
            else {
                $self->{header_lines} .= $data . $eol;
            }
        }

        $data .= $eol; # append EOL
        $self->transaction->body_write($data);
        $self->{data_size} += length $data;
    }
    
    $self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
}

sub end_of_data {
    my $self = shift;
    
    #$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
    
    $self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
    
    my $header = $self->transaction->header;
    if (!$header) {
        $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
        $self->transaction->header($header);
    }
    
    my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
    my $esmtp = substr($smtp,0,1) eq "E";
    my $authheader;
    my $sslheader;
    
    if (defined $self->connection->notes('tls_enabled')
            and $self->connection->notes('tls_enabled'))
    {
        $smtp .= "S" if $esmtp; # RFC3848
        $sslheader = "(".$self->connection->notes('tls_socket')->get_cipher()." encrypted) ";
    }
    
    if (defined $self->{_auth} and $self->{_auth} == OK) {
        $smtp .= "A" if $esmtp; # RFC3848
        $authheader = "(smtp-auth username $self->{_auth_user}, mechanism $self->{_auth_mechanism})\n";
    }
    
    $header->add("Received", $self->received_line($smtp, $authheader, $sslheader), 0);
    
    return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size};
    
    my ($rc, $msg) = $self->run_hooks("data_post");
    return 1;
}

1;

Reply via email to