OK, here's the files as they currently stand. The big note on this is that
I have done very little testing, and most importantly I have NOT updated
any of the plugins (see the async/* dir for those that need re-written to
use AnyEvent, mostly to use AnyEvent::DNS. Also the tls plugin would
need updated/rewritten, though it's probably a lot simpler with AnyEvent).
I'll be trying to do performance testing tomorrow (comparing with
qpsmtpd-async, NOT any of the other models).
Matt.
#!/usr/bin/perl
use lib "./lib";
BEGIN {
delete $ENV{ENV};
delete $ENV{BASH_ENV};
$ENV{PATH} = '/bin:/usr/bin:/var/qmail/bin:/usr/local/bin';
}
# Profiling - requires Devel::Profiler 0.05
#BEGIN { $Devel::Profiler::NO_INIT = 1; }
#use Devel::Profiler;
use strict;
use vars qw($DEBUG);
use FindBin qw();
# TODO: need to make this taint friendly
use lib "$FindBin::Bin/lib";
use Qpsmtpd::AnyEvent;
use Qpsmtpd::ConfigServer;
use Qpsmtpd::Constants;
use Carp;
use POSIX qw(WNOHANG);
use Getopt::Long;
use List::Util qw(shuffle);
use Socket;
use AnyEvent::Socket;
use AnyEvent::Util;
$|++;
$SIG{'PIPE'} = "IGNORE"; # handled manually
$DEBUG = 0;
my $PORT = 2525;
my $LOCALADDR = '0.0.0.0';
my $PROCS = 1;
my $USER = (getpwuid $>)[0]; # user to suid to
$USER = "smtpd" if $USER eq "root";
my $PAUSED = 0;
my $NUMACCEPT = 20;
my $PID_FILE = '';
my $ACCEPT_RSET;
my $DETACH; # daemonize on startup
# make sure we don't spend forever doing accept()
use constant ACCEPT_MAX => 1000;
sub reset_num_accept {
$NUMACCEPT = 20;
}
sub help {
print <<EOT;
Usage:
qpsmtpd [OPTIONS]
Options:
-l, --listen-address addr : listen on a specific address; default 0.0.0.0
-p, --port P : listen on a specific port; default 2525
-u, --user U : run as a particular user; defualt 'smtpd'
-j, --procs J : spawn J processes; default 1
-d, --detach : detach from controlling terminal (daemonize)
--pid-file P : print main servers PID to file P
-h, --help : this page
EOT
exit(0);
}
GetOptions(
'p|port=i' => \$PORT,
'l|listen-address=s' => \$LOCALADDR,
'j|procs=i' => \$PROCS,
'v|verbose+' => \$DEBUG,
'u|user=s' => \$USER,
'pid-file=s' => \$PID_FILE,
'd|detach' => \$DETACH,
'h|help' => \&help,
) || help();
# detaint the commandline
if ($PORT =~ /^(\d+)$/) { $PORT = $1 } else { &help }
if ($LOCALADDR =~ /^([\d\w\-.]+)$/) { $LOCALADDR = $1 } else { &help }
if ($USER =~ /^([\w\-]+)$/) { $USER = $1 } else { &help }
if ($PROCS =~ /^(\d+)$/) { $PROCS = $1 } else { &help }
use constant READY => 1;
use constant ACCEPTING => 2;
use constant RESTARTING => 999;
my $CURRENT_PROCS = 0;
my %childstatus = ();
my $SERVER;
if ($PID_FILE && -r $PID_FILE) {
open PID, "<$PID_FILE"
or die "open_pidfile $PID_FILE: $!\n";
my $running_pid = <PID> || ''; chomp $running_pid;
if ($running_pid =~ /^(\d+)/) {
if (kill 0, $running_pid) {
die "Found an already running qpsmtpd with pid $running_pid.\n";
}
}
close(PID);
}
run_as_server();
exit(0);
sub spawn_child {
my ($plugin_loader) = @_;
my $pid = fork;
if ($pid) {
$childstatus{$pid}++;
$CURRENT_PROCS++;
return $pid;
}
elsif (!defined($pid)) {
die "Unable to fork";
}
$plugin_loader->run_hooks('post-fork');
accept_loop();
}
sub accept_loop {
my $w = AE::io $SERVER, 0, sub {
print "Connect?\n";
while ($SERVER && (my $peer = accept my $fh, $SERVER)) {
fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside
world does not
my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $peer;
my $qp = Qpsmtpd::AnyEvent->new($fh, format_address($host),
$service);
$qp->process_line("Connect");
}
};
AnyEvent->condvar->wait;
exit;
}
sub sig_hup {
kill 1, keys %childstatus;
}
sub sig_chld {
my $spawn_count = 0;
while ( (my $child = waitpid(-1,WNOHANG)) > 0) {
if (!defined $childstatus{$child}) {
next;
}
last unless $child > 0;
print "SIGCHLD: child $child died\n";
delete $childstatus{$child};
$CURRENT_PROCS--;
}
$SIG{CHLD} = \&sig_chld;
}
sub HUNTSMAN {
$SIG{CHLD} = 'DEFAULT';
kill 'INT' => keys %childstatus;
if ($PID_FILE && -e $PID_FILE) {
unlink $PID_FILE or ::log(LOGERROR, "unlink: $PID_FILE: $!");
}
exit(0);
}
sub _connect_sock {
my $ipn = parse_address($LOCALADDR) or die "cannot parse '$LOCALADDR' as
host address";
my $af = address_family $ipn;
socket(my $sock, $af, SOCK_STREAM, 0) or die "socket: $!";
setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, 1)
or die "tcp_server/so_reuseaddr: $!";
bind $sock, AnyEvent::Socket::pack_sockaddr($PORT, $ipn)
or die "bind: $!";
fh_nonblocking($sock, 1);
listen($sock, 128);
return $sock;
}
sub run_as_server {
AnyEvent::detect;
$SERVER = _connect_sock();
# Drop priviledges
my (undef, undef, $quid, $qgid) = getpwnam $USER or
die "unable to determine uid/gid for $USER\n";
my $groups = "$qgid $qgid";
while (my (undef, undef, $gid, $members) = getgrent) {
my @m = split(/ /, $members);
if (grep { $_ eq $USER } @m) {
$groups .= " $gid";
}
}
endgrent;
$) = $groups;
POSIX::setgid($qgid) or
die "unable to change gid: $!\n";
POSIX::setuid($quid) or
die "unable to change uid: $!\n";
$> = $quid;
# Load plugins here
my $plugin_loader = Qpsmtpd::SMTP->new();
$plugin_loader->load_plugins;
if ($DETACH) {
open STDIN, '/dev/null' or die "/dev/null: $!";
open STDOUT, '>/dev/null' or die "/dev/null: $!";
open STDERR, '>&STDOUT' or die "open(stderr): $!";
defined (my $pid = fork) or die "fork: $!";
exit 0 if $pid;
POSIX::setsid or die "setsid: $!";
}
if ($PID_FILE) {
open PID, ">$PID_FILE" || die "$PID_FILE: $!";
print PID $$,"\n";
close PID;
}
$plugin_loader->log(LOGINFO, 'Running as user '.
(getpwuid($>) || $>) .
', group '.
(getgrgid($)) || $)));
$plugin_loader->log(LOGDEBUG, "Listening on $PORT with $PROCS children");
while (1) {
if ($PROCS > 1) {
$SIG{CHLD} = $SIG{HUP} = 'IGNORE';
while ($CURRENT_PROCS < $PROCS) {
spawn_child($plugin_loader);
}
$SIG{INT} = $SIG{TERM} = \&HUNTSMAN;
$SIG{CHLD} = \&sig_chld;
$SIG{HUP} = \&sig_hup;
# multi-process mode, just sleep
sleep;
# got here because a signal received?
print "Signal received...\n";
}
else {
accept_loop;
}
}
}
sub log {
my ($level,$message) = @_;
# $level not used yet. this is reimplemented from elsewhere anyway
warn("$$ fd:? $message\n");
}
package Qpsmtpd::AnyEvent;
use base ('Qpsmtpd::SMTP');
use Qpsmtpd::Constants;
use Qpsmtpd::Address;
use Mail::Header;
use POSIX qw(strftime);
use Time::HiRes qw(time);
use AnyEvent;
use AnyEvent::DNS;
use AnyEvent::Handle;
use Socket;
use strict;
sub max_idle_time { 60 }
sub max_connect_time { 1200 }
sub input_sock {
my $self = shift;
@_ and die "Read only";
$self->{handle}->fh;
}
sub new {
my ($class, $fh, $host, $port) = @_;
my $self = $class->SUPER::new();
my $hdl = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
warn "got error $msg\n";
$hdl->destroy;
}
);
$hdl->push_read(line => sub {
my ($h, $line) = @_;
$self->process_line($line);
});
$self->{start_time} = time;
$self->{mode} = 'connect';
$self->{handle} = $hdl;
$self->{remote_host} = $host;
$self->{remote_port} = $port;
$self->load_plugins;
$self->load_logging;
my ($rc, @msg) = $self->run_hooks_no_respond("pre-connection");
if ($rc == DENYSOFT || $rc == DENYSOFT_DISCONNECT) {
@msg = ("Sorry, try again later")
unless @msg;
$self->respond(451, @msg);
$self->disconnect;
}
elsif ($rc == DENY || $rc == DENY_DISCONNECT) {
@msg = ("Sorry, service not available for you")
unless @msg;
$self->respond(550, @msg);
$self->disconnect;
}
$self->reset_for_next_message();
$self->{mode} = 'connect';
return $self;
}
sub uptime {
my $self = shift;
return (time() - $self->{start_time});
}
sub reset_for_next_message {
my $self = shift;
$self->{_commands} = {
ehlo => 1,
helo => 1,
rset => 1,
mail => 1,
rcpt => 1,
data => 1,
help => 1,
vrfy => 1,
noop => 1,
quit => 1,
auth => 0, # disabled by default
};
$self->{mode} = 'cmd';
$self->{_extras} = {};
}
sub respond {
my $self = shift;
my ($code, @messages) = @_;
while (my $msg = shift @messages) {
my $line = $code . (@messages ? "-" : " ") . $msg;
$self->{handle}->push_write("$line\r\n");
}
return 1;
}
# there's no "pause" needed on AnyEvent, because we are only interested
# in reading after we write something, which is handled in respond() above.
sub continue_read {}
sub pause_read {}
my %cmd_cache;
sub process_line {
my $self = shift;
my $line = shift || return;
if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line\n"; }
if ($self->{mode} eq 'cmd') {
$line =~ s/\r?\n$//s;
$self->connection->notes('original_string', $line);
my ($cmd, @params) = split(/ +/, $line, 2);
my $meth = lc($cmd);
if (my $lookup = $cmd_cache{$meth} || $self->{_commands}->{$meth} && $self->can($meth)) {
$cmd_cache{$meth} = $lookup;
eval {
$lookup->($self, @params);
};
if ($@) {
my $error = $@;
chomp($error);
$self->log(LOGERROR, "Command Error: $error");
$self->fault("command '$cmd' failed unexpectedly");
}
}
else {
# No such method - i.e. unrecognized command
my ($rc, $msg) = $self->run_hooks("unrecognized_command", $meth, @params);
}
$self->{handle}->push_read(line => sub { $self->process_line($_[1]) });
}
elsif ($self->{mode} eq 'connect') {
$self->{mode} = 'cmd';
# I've removed an eval{} from around this. It shouldn't ever die()
# but if it does we're a bit screwed... Ah well :-)
$self->start_conversation;
}
else {
die "Unknown mode";
}
return;
}
sub disconnect {
my $self = shift;
$self->SUPER::disconnect(@_);
$self->{handle}->destroy;
}
sub close {
my $self = shift;
$self->run_hooks_no_respond("post-connection");
$self->connection->reset;
$self->{handle}->destroy;
}
sub start_conversation {
my $self = shift;
my $conn = $self->connection;
# set remote_host, remote_ip and remote_port
my ($ip, $port) = $self->{remote_host}, $self->{remote_port};
return $self->close() unless $ip;
$conn->remote_ip($ip);
$conn->remote_port($port);
$conn->remote_info("[$ip]");
my $pn = getsockname($self->{handle}->fh);
my ($port, $iaddr) = Socket::sockaddr_in($pn);
$self->{local_port} = $port;
$self->{local_ip} = Socket::inet_ntoa($iaddr);
$conn->local_ip($self->{local_ip});
$conn->local_port($self->{local_port});
print "Looking up IP: $ip (L: $self->{local_ip})\n";
AnyEvent::DNS::reverse_lookup(
$ip, sub {
print "Reverse lookup returned: $_[0]\n";
if (!...@_) {
$conn->remote_info($conn->remote_host("DNSERROR"));
}
else {
$conn->remote_info($conn->remote_host($_[0]));
}
$self->run_hooks("connect");
}
);
return;
}
sub data {
my $self = shift;
my ($rc, $msg) = $self->run_hooks("data");
return 1;
}
sub data_respond {
my $self = shift;
my ($rc, $msg) = @_;
if ($rc == DONE) {
return;
}
elsif ($rc == DENY) {
$msg->[0] ||= "Message denied";
$self->respond(554, @$msg);
$self->reset_transaction();
return;
}
elsif ($rc == DENYSOFT) {
$msg->[0] ||= "Message denied temporarily";
$self->respond(451, @$msg);
$self->reset_transaction();
return;
}
elsif ($rc == DENY_DISCONNECT) {
$msg->[0] ||= "Message denied";
$self->respond(554, @$msg);
$self->disconnect;
return;
}
elsif ($rc == DENYSOFT_DISCONNECT) {
$msg->[0] ||= "Message denied temporarily";
$self->respond(451, @$msg);
$self->disconnect;
return;
}
return $self->respond(503, "MAIL first") unless $self->transaction->sender;
return $self->respond(503, "RCPT first") unless $self->transaction->recipients;
$self->{header_lines} = '';
$self->{data_size} = 0;
$self->{in_header} = 1;
$self->{max_size} = ($self->config('databytes'))[0] || 0;
$self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
$self->respond(354, "go ahead");
# my $max_get = $self->{max_size} || 1048576;
$self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
return 1;
}
sub got_data {
my ($self, $data, $eol) = @_;
if ($data eq ".") {
$self->end_of_data;
return;
}
# add a transaction->blocked check back here when we have line by line plugin access...
unless (($self->{max_size} and $self->{data_size} > $self->{max_size})) {
$data =~ s/^\.\./\./mg;
if ($self->{in_header}) {
print "In Header: $data\n";
if ($data eq '') {
# end of headers
$self->{in_header} = 0;
# ... need to check that we don't reformat any of the received lines.
#
# 3.8.2 Received Lines in Gatewaying
# When forwarding a message into or out of the Internet environment, a
# gateway MUST prepend a Received: line, but it MUST NOT alter in any
# way a Received: line that is already in the header.
my @header_lines = split(/^/m, $self->{header_lines});
my $header = Mail::Header->new(\...@header_lines,
Modify => 0, MailFrom => "COERCE");
$self->transaction->header($header);
$self->{header_lines} = '';
# FIXME - call plugins to work on just the header here; can
# save us buffering the mail content.
# Save the start of just the body itself
$self->transaction->set_body_start();
}
else {
$self->{header_lines} .= $data . $eol;
}
}
$data .= $eol; # append EOL
$self->transaction->body_write($data);
$self->{data_size} += length $data;
}
$self->{handle}->unshift_read(line => sub { $self->got_data($_[1], $_[2]) });
}
sub end_of_data {
my $self = shift;
#$self->log(LOGDEBUG, "size is at $size\n") unless ($i % 300);
$self->log(LOGDEBUG, "max_size: $self->{max_size} / size: $self->{data_size}");
my $header = $self->transaction->header;
if (!$header) {
$header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
$self->transaction->header($header);
}
my $smtp = $self->connection->hello eq "ehlo" ? "ESMTP" : "SMTP";
my $esmtp = substr($smtp,0,1) eq "E";
my $authheader;
my $sslheader;
if (defined $self->connection->notes('tls_enabled')
and $self->connection->notes('tls_enabled'))
{
$smtp .= "S" if $esmtp; # RFC3848
$sslheader = "(".$self->connection->notes('tls_socket')->get_cipher()." encrypted) ";
}
if (defined $self->{_auth} and $self->{_auth} == OK) {
$smtp .= "A" if $esmtp; # RFC3848
$authheader = "(smtp-auth username $self->{_auth_user}, mechanism $self->{_auth_mechanism})\n";
}
$header->add("Received", $self->received_line($smtp, $authheader, $sslheader), 0);
return $self->respond(552, "Message too big!") if $self->{max_size} and $self->{data_size} > $self->{max_size};
my ($rc, $msg) = $self->run_hooks("data_post");
return 1;
}
1;