Author: vetinari
Date: Tue Aug 21 07:20:41 2007
New Revision: 774
Modified:
contrib/vetinari/experimental/chunking
Log:
chunking plugin update: fixes, additions
- now supports Apache::Qpsmtpd, qpsmtpd-forkserver, qpsmtpd-prefork
- fix missing empty line between header and body
- add received_line hook
Modified: contrib/vetinari/experimental/chunking
==============================================================================
--- contrib/vetinari/experimental/chunking (original)
+++ contrib/vetinari/experimental/chunking Tue Aug 21 07:20:41 2007
@@ -33,23 +33,65 @@
DON'T USE :P
+=head1 TODO
+
+=over 4
+
+=item headers
+
+get headers earlier, so we can run a (currently non existing) C<data_headers>?
+hook
+
+=item CRLF
+
+s/\r?\n$/\n/: only in headers ... and in body when BODY=BINARYMIME was not?
+given
+
+=back
+
=cut
use Qpsmtpd::DSN;
use POSIX qw(strftime);
+use Fcntl qw(:seek);
+
+our $reader;
sub register {
my ($self, $qp, @args) = @_;
if (@args > 2) {
$self->log(LOGERROR, "Bad parameters for the chunking plugin")
}
+
$self->{_binarymime} = 0;
if (lc $args[0] eq 'binarymime') {
$self->{_binarymime} = 1;
}
+
+ $reader = 'read_block';
+ $self->{_bdat_block} = 4096;
+ if ($qp->{conn} && $qp->{conn}->isa('Apache2::Connection')) {
+ $reader = 'ap_read_block';
+ $self->{_bdat_block} = 8000;
+
+ require APR::Const;
+ APR::Const->import(qw(BLOCK_READ EOF SUCCESS TIMEUP));
+
+ # require APR::Socket;
+ # APR::Socket->import(qw());
+
+ require Apache2::Const;
+ Apache2::Const->import(qw(MODE_READBYTES));
+
+ require Apache2::Connection;
+ Apache2::Connection->import(qw());
+
+ require APR::Error;
+ APR::Error->import(qw());
+ }
}
-sub hook_ehlo {
+sub hook_ehlo { # announce that we're able to do CHUNKING (and BINARYMIME)
my ($self, $transaction) = @_;
my $cap = $transaction->notes('capabilities');
$cap ||= [];
@@ -78,9 +120,10 @@
sub hook_unrecognized_command {
my ($self, $transaction, $cmd, $size) = @_;
- return (DECLINED) unless $cmd eq 'bdat';
- my ($err, $last);
+ return (DECLINED)
+ unless lc($cmd) eq 'bdat';
+ my ($err, $last);
my $msg_size = $transaction->notes('bdat_size') || 0;
# DATA and BDAT commands cannot be used in the same transaction. If a
@@ -107,52 +150,52 @@
($err) =
(Qpsmtpd::DSN->proto_syntax_error("Syntax error in BDAT
parameter"))[1];
-
- unless (defined $size || $size =~ /^\d+$/) {
- $self->qp->respond(552, $err);
- return (DONE);
- }
- if ($size =~ /^(\d+)\s*(LAST)?\s*$/) {
+ if ($size =~ /^(\d+)\s*(\S+)?\s*$/) {
$size = $1;
$last = $2;
}
+ unless (defined $size && $size =~ /^\d+$/) {
+ $self->qp->respond(552, $err);
+ return (DONE);
+ }
- if (defined $last) {
- if ($last =~ /^LAST$/i) { # RFC says LAST all upper, we don't care
- $last = 1;
- }
- else {
+ if (!defined($last) or $last =~ /^$/) {
+ $last = $size ? 0 : 1;
+ }
+ else {
+ unless (uc($last) eq 'LAST') { # RFC says LAST all upper, we don't care
$self->qp->respond(552, $err);
return (DONE);
}
- }
- else {
- $last = 0;
- ($last = 1) unless $size;
+ $last = 1;
}
$transaction->notes('bdat_bdat', 1); # remember we've seen BDAT
- # get a file to write the data chunks
- my $file = $transaction->body_filename;
- # ouch :o), don't do this abuse of internals at home kids :P
- my $fh = $transaction->body_fh;
- seek($fh, 0, 2)
+ ## get a file to write the data chunks:
+ # ... open a new temporary file if it does not exist
+ my $file = $transaction->body_filename;
+ my $fh = $transaction->body_fh; # and get the fh for the open file
+ seek($fh, 0, SEEK_END)
or $self->log(LOGERROR, "failed to seek: $!"),
$self->qp->respond(452, "Temporary storage allocation error"),
return (DONE);
- # we're at the end of the file, now read the chunk
- my $rest = $size % 4096;
- my $num = ($size-$rest)/4096;
- my $i = 0;
- my $buffer;
- my $bytes;
-
- while($i < $num) {
- $bytes = read(STDIN, $buffer, 4096);
- if ($bytes != 4096) {
+ # we're at the end of the file, now read the chunk (and write it to $fh)
+ my $sum = 0;
+ my ($buffer, $bytes, $left) = ("", 0, 0);
+
+ my $block = $self->{_bdat_block};
+ my ($rc, $msg);
+ while ($sum < $size) {
+ ($buffer, $bytes, $rc, $msg) = $self->$reader($block);
+ if ($rc) {
+ $self->log(LOGERROR, "Failed to read: $msg");
+ $self->qp->respond($rc, $msg);
+ return (DONE);
+ }
+ if (!defined $buffer or !$bytes) {
$self->log(LOGERROR, "Failed to read: $!");
$self->qp->respond(452, "Error reading your data");
return (DONE);
@@ -161,74 +204,80 @@
or $self->log(LOGERROR, "Failed to write: $!"),
$self->qp->respond(452, "Temporary storage allocation error"),
return (DONE);
- ++$i;
- }
- $bytes = read(STDIN, $buffer, $rest);
- if ($bytes != $rest) {
- $self->log(LOGERROR, "Failed to read: $!");
- $self->qp->respond(452, "Error reading your data");
- return (DONE);
+
+ $sum += $bytes;
+ $left = $size - $sum;
+ $block = ($left < $block) ? $left : $block;
}
- print $fh $buffer
- or $self->log(LOGERROR, "Failed to write: $!"),
- $self->qp->respond(452, "Temporary storage allocation error"),
- return (DONE);
# ok, got the chunk on disk
+ $self->log(LOGDEBUG, "OK, got the chunk of $size bytes, LAST=$last");
# let's see if the mail is too big...
# ...we can't do this before reading the chunk, because the BDAT command
# requires us to read the chunk before responding
my $max = $self->qp->config('databytes');
- if ($max) {
- if (($msg_size + $size) > $max) {
- $self->qp->respond(552, "Message too big!");
- return(DONE);
- }
+ if ($max && (($msg_size + $size) > $max)) {
+ $self->qp->respond(552, "Message too big!");
+ # $self->qp->reset_transaction; ### FIXME: reset here?
+ return(DONE);
}
$transaction->notes('bdat_size', $msg_size + $size);
- if (!$last) { # get the next chunk
+ unless ($last) { # get the next chunk
$self->qp->respond(250, "Ok, got $size octets");
- return (DONE);
+ return(DONE);
}
# else
-
+
# ... get the headers, run data_post & queue hooks
$transaction->notes('bdat_last', 1);
- seek($fh, 0, 0)
+ seek($fh, 0, SEEK_SET)
or $self->log(LOGERROR, "Failed to seek: $!"),
$self->qp->respond(452, "Temporary storage allocation error"),
return (DONE);
$buffer = "";
while (<$fh>) {
- last if /^\r?\n$/;
+ if (/^\r?\n$/) {
+ seek($fh, -length($_), SEEK_CUR);
+ # the body starts here...
+ $self->transaction->set_body_start();
+ last;
+ }
s/\r\n$/\n/;
$buffer .= $_;
+ # if (length($buffer) > 50_000) ;
+ # $self->qp->respond(500, "Header size too large")
+ # return (DONE);
+ # }
}
- # the body starts here...
- $self->transaction->set_body_start();
my $header = Mail::Header->new(Modify => 0, MailFrom => "COERCE");
my @header = split /^/m, $buffer;
+ # undef $buffer;
$header->extract([EMAIL PROTECTED]);
$self->transaction->header($header);
- my $authheader = (defined $self->{_auth} and $self->{_auth} == OK)
- ? "(smtp-auth username $self->{_auth_user}, "
- ."mechanism $self->{_auth_mechanism})\n"
- : "";
-
- # no need for SMPT/ESMTP diff, we know we've just received via ESMTP (EHLO)
- $header->add("Received",
- "from ".$self->connection->remote_info
- # can/should/must this be EHLO instead of HELO?
- ." (HELO ".$self->connection->hello_host.")"
- ." (". $self->connection->remote_ip. ")\n "
- .$authheader
- ." by ".$self->qp->config('me')." (qpsmtpd/".$self->qp->version.") "
- ."with ESMTP". ($authheader ? "A" : "")."; " # ESMPTA: RFC 3848
- .(strftime('%a, %d %b %Y %H:%M:%S %z', localtime)), 0);
+ my $rcvd_line;
+ ($rc, $rcvd_line) = $self->qp->run_hooks("received_line");
+ if ($rc != OK or not $rcvd_line) {
+ my $authheader = (defined $self->{_auth} and $self->{_auth} == OK)
+ ? "\t(smtp-auth username $self->{_auth_user}, "
+ ."mechanism $self->{_auth_mechanism})\n"
+ : "";
+
+ $rcvd_line = "from ".$self->connection->remote_info
+ # can/should/must this be EHLO instead of HELO?
+ ." (HELO ".$self->connection->hello_host.")"
+ ." (". $self->connection->remote_ip. ")\n "
+ .$authheader
+ ."\tby ".$self->qp->config('me')."
(qpsmtpd/".$self->qp->version.") "
+ # no need for SMPT/ESMTP diff, we know we've just received
+ # via ESMTP (EHLO)
+ ."with ESMTP". ($authheader ? "A" : "")."; " # ESMPTA: RFC 3848
+ .(strftime('%a, %d %b %Y %H:%M:%S %z', localtime));
+ }
+ $header->add("Received", $rcvd_line, 0);
# everything done for running data_post...
# this will call the spamassassin, virus scanner and queue plugins
@@ -238,11 +287,52 @@
$self->qp->run_hooks("data_post");
# BDAT (0( LAST)?|$num LAST) is always the end of a "transaction"
- # ... doesn't matter if it had done before
- $self->qp->reset_transaction;
+ $self->qp->reset_transaction; # ... doesn't matter if it had done before
return (DONE);
}
+sub ap_read_block {
+ my ($self, $block_size) = @_;
+ my $conn = $self->qp->{conn};
+ return (undef, 0, 452, "You don't see this, your connection is dead")
+ if $conn->aborted;
+
+ my $buffer;
+
+### This does not work if the client does not fetch the response after
+### every BDAT command... why should he fetch it, we're offering PIPELINING
+# my $sock = $conn->client_socket;
+# my $bytes = eval { $sock->recv($buffer, $block_size) };
+# if ($@ && ref $@ && $@ == APR::Const::TIMEUP()) {
+# return (undef, 0, 452, "Timeout reading your data");
+# }
+# return ($buffer, $bytes);
+
+ my $bb = $self->qp->{bb_in};
+ my $rc = $conn->input_filters->get_brigade($bb,
+ Apache2::Const::MODE_READBYTES(),
+ APR::Const::BLOCK_READ(),
+ $block_size);
+ return (undef, 0, 452, "You don't see this, got EOF")
+ if $rc == APR::Const::EOF();
+ die APR::Error::strerror($rc)
+ unless $rc == APR::Const::SUCCESS();
+
+ $bb->flatten($buffer);
+ $bb->cleanup;
+ return ($buffer, length($buffer));
+}
+
+sub read_block {
+ my ($self, $block_size) = @_;
+ my ($bytes, $buffer);
+ $bytes = read(STDIN, $buffer, $block_size);
+
+ return (undef, 0, 452, "Failed to read your data")
+ unless $bytes;
+ return ($buffer, $bytes);
+}
+
sub hook_data {
my ($self, $transaction) = @_;
if ($transaction->notes('bdat_body_binarymime')