stas 2003/01/14 22:07:11
Modified: src/modules/perl modperl_filter.c modperl_filter.h
modperl_types.h
xs/tables/current/ModPerl FunctionTable.pm
Log:
- implementation of the input stream filtering support (1st phase)
- code refactoring to be re-use for input and output filtering
- proper support for mis-behaved feeding filters that send more than one
EOS bucket
Revision Changes Path
1.43 +225 -66 modperl-2.0/src/modules/perl/modperl_filter.c
Index: modperl_filter.c
===================================================================
RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_filter.c,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- modperl_filter.c 12 Jan 2003 02:21:37 -0000 1.42
+++ modperl_filter.c 15 Jan 2003 06:07:10 -0000 1.43
@@ -94,15 +94,23 @@
filter->mode = mode;
filter->f = f;
- filter->bb = bb;
filter->pool = p;
filter->wbucket.pool = p;
filter->wbucket.filters = &f->next;
filter->wbucket.outcnt = 0;
+ if (mode == MP_INPUT_FILTER_MODE) {
+ filter->bb_in = NULL;
+ filter->bb_out = bb;
+ }
+ else {
+ filter->bb_in = bb;
+ filter->bb_out = NULL;
+ }
+
MP_TRACE_f(MP_FUNC, "filter=0x%lx, mode=%s\n",
- (unsigned long)filter, mode == MP_OUTPUT_FILTER_MODE ?
- "output" : "input");
+ (unsigned long)filter,
+ mode == MP_INPUT_FILTER_MODE ? "input" : "output");
return filter;
}
@@ -138,7 +146,10 @@
modperl_handler_make_args(aTHX_ &args,
"Apache::Filter", filter->f,
- "APR::Brigade", filter->bb,
+ "APR::Brigade",
+ (filter->mode == MP_INPUT_FILTER_MODE
+ ? filter->bb_out
+ : filter->bb_in),
NULL);
modperl_filter_mg_set(aTHX_ AvARRAY(args)[0], filter);
@@ -168,26 +179,59 @@
filter->seen_eos = 0;
}
- if (filter->mode == MP_OUTPUT_FILTER_MODE) {
+ if (filter->mode == MP_INPUT_FILTER_MODE) {
+ if (filter->bb_in) {
+ /* in the streaming mode filter->bb_in is populated on the
+ * first modperl_input_filter_read, so it must be
+ * destroyed at the end of the filter invocation
+ */
+ /* XXX: may be the filter must consume all the data? add a
+ * test to check */
+ apr_brigade_destroy(filter->bb_in);
+ filter->bb_in = NULL;
+ }
+ modperl_input_filter_flush(filter);
+ }
+ else {
modperl_output_filter_flush(filter);
}
+
return status;
}
/* output filters */
-MP_INLINE static apr_status_t send_eos(ap_filter_t *f)
+MP_INLINE static apr_status_t send_input_eos(modperl_filter_t *filter)
+{
+ apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
+ apr_bucket *b = apr_bucket_eos_create(ba);
+ APR_BRIGADE_INSERT_TAIL(filter->bb_out, b);
+ ((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos = 1;
+ return APR_SUCCESS;
+
+}
+
+MP_INLINE static apr_status_t send_input_flush(modperl_filter_t *filter)
+{
+ apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
+ apr_bucket *b = apr_bucket_flush_create(ba);
+ APR_BRIGADE_INSERT_TAIL(filter->bb_out, b);
+ return APR_SUCCESS;
+}
+
+MP_INLINE static apr_status_t send_output_eos(ap_filter_t *f)
{
apr_bucket_alloc_t *ba = f->c->bucket_alloc;
apr_bucket_brigade *bb = apr_brigade_create(MP_FILTER_POOL(f),
ba);
apr_bucket *b = apr_bucket_eos_create(ba);
APR_BRIGADE_INSERT_TAIL(bb, b);
+ ((modperl_filter_ctx_t *)f->ctx)->sent_eos = 1;
return ap_pass_brigade(f->next, bb);
}
-MP_INLINE static apr_status_t send_flush(ap_filter_t *f)
+MP_INLINE static apr_status_t send_output_flush(ap_filter_t *f)
{
apr_bucket_alloc_t *ba = f->c->bucket_alloc;
apr_bucket_brigade *bb = apr_brigade_create(MP_FILTER_POOL(f),
@@ -199,11 +243,14 @@
/* unrolled APR_BRIGADE_FOREACH loop */
+#define MP_FILTER_EMPTY(filter) \
+APR_BRIGADE_EMPTY(filter->bb_in)
+
#define MP_FILTER_SENTINEL(filter) \
-APR_BRIGADE_SENTINEL(filter->bb)
+APR_BRIGADE_SENTINEL(filter->bb_in)
#define MP_FILTER_FIRST(filter) \
-APR_BRIGADE_FIRST(filter->bb)
+APR_BRIGADE_FIRST(filter->bb_in)
#define MP_FILTER_NEXT(filter) \
APR_BUCKET_NEXT(filter->bucket)
@@ -216,52 +263,83 @@
MP_INLINE static int get_bucket(modperl_filter_t *filter)
{
- if (!filter->bb) {
+ if (!filter->bb_in || MP_FILTER_EMPTY(filter)) {
+ MP_TRACE_f(MP_FUNC, "%s filter bucket brigade is empty\n",
+ (filter->mode == MP_INPUT_FILTER_MODE ? "input" : "output"));
return 0;
}
+
if (!filter->bucket) {
filter->bucket = MP_FILTER_FIRST(filter);
- return 1;
- }
- else if (MP_FILTER_IS_EOS(filter)) {
- MP_TRACE_f(MP_FUNC, "received EOS bucket\n");
- filter->seen_eos = 1;
- return 1;
}
else if (filter->bucket != MP_FILTER_SENTINEL(filter)) {
filter->bucket = MP_FILTER_NEXT(filter);
- if (filter->bucket == MP_FILTER_SENTINEL(filter)) {
- apr_brigade_destroy(filter->bb);
- filter->bb = NULL;
- return 0;
- }
- else {
- return 1;
- }
}
- return 0;
+ if (filter->bucket == MP_FILTER_SENTINEL(filter)) {
+ filter->bucket = NULL;
+ /* can't destroy bb_in since the next read will need a brigade
+ * to try to read from */
+ apr_brigade_cleanup(filter->bb_in);
+ return 0;
+ }
+
+ if (MP_FILTER_IS_EOS(filter)) {
+ MP_TRACE_f(MP_FUNC, "%s filter received EOS bucket\n",
+ (filter->mode == MP_INPUT_FILTER_MODE
+ ? "input" : "output"));
+
+ filter->seen_eos = 1;
+ /* there should be only one EOS sent, modperl_filter_read will
+ * not come here, since filter->seen_eos is set
+ */
+ return 0;
+ }
+ else if (MP_FILTER_IS_FLUSH(filter)) {
+ MP_TRACE_f(MP_FUNC, "%s filter received FLUSH bucket\n",
+ (filter->mode == MP_INPUT_FILTER_MODE
+ ? "input" : "output"));
+
+ filter->flush = 1;
+ return 0;
+ }
+ else {
+ return 1;
+ }
}
-MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
+
+MP_INLINE static apr_size_t modperl_filter_read(pTHX_
modperl_filter_t *filter,
SV *buffer,
apr_size_t wanted)
{
int num_buckets = 0;
apr_size_t len = 0;
-
+
(void)SvUPGRADE(buffer, SVt_PV);
SvPOK_only(buffer);
SvCUR(buffer) = 0;
- /*modperl_brigade_dump(filter->bb);*/
+ /* sometimes the EOS bucket arrives in the same brigade with other
+ * buckets, so that particular read() will not return 0 and will
+ * be called again if called in the while ($filter->read(...))
+ * loop. In that case we return 0.
+ */
+ if (filter->seen_eos) {
+ return 0;
+ }
+
+ /*modperl_brigade_dump(filter->bb_in, stderr);*/
- MP_TRACE_f(MP_FUNC, "caller wants %d bytes\n", wanted);
+ MP_TRACE_f(MP_FUNC, "%s filter wants %d bytes\n",
+ (filter->mode == MP_INPUT_FILTER_MODE ? "input" : "output"),
+ wanted);
if (filter->remaining) {
if (filter->remaining >= wanted) {
- MP_TRACE_f(MP_FUNC, "eating %d of remaining %d leftover bytes\n",
+ MP_TRACE_f(MP_FUNC,
+ "eating/returning %d of remaining %d leftover bytes\n",
wanted, filter->remaining);
sv_catpvn(buffer, filter->leftover, wanted);
filter->leftover += wanted;
@@ -278,11 +356,6 @@
}
}
- if (!filter->bb) {
- MP_TRACE_f(MP_FUNC, "bucket brigade has been emptied\n");
- return 0;
- }
-
while (1) {
const char *buf;
apr_size_t buf_len;
@@ -291,17 +364,6 @@
break;
}
- if (MP_FILTER_IS_EOS(filter)) {
- MP_TRACE_f(MP_FUNC, "received EOS bucket\n");
- filter->seen_eos = 1;
- break;
- }
- else if (MP_FILTER_IS_FLUSH(filter)) {
- MP_TRACE_f(MP_FUNC, "received FLUSH bucket\n");
- filter->flush = 1;
- break;
- }
-
num_buckets++;
filter->rc = apr_bucket_read(filter->bucket, &buf, &buf_len, 0);
@@ -336,17 +398,59 @@
}
}
-#ifdef MP_TRACE
- if (num_buckets) {
- MP_TRACE_f(MP_FUNC,
- "returning %d bytes from %d bucket%s "
- "(%d bytes leftover)\n",
- len, num_buckets, ((num_buckets > 1) ? "s" : ""),
- filter->remaining);
+ MP_TRACE_f(MP_FUNC,
+ "returning %d bytes from %d bucket%s "
+ "(%d bytes leftover)\n",
+ len, num_buckets, ((num_buckets == 1) ? "" : "s"),
+ filter->remaining);
+
+ return len;
+}
+
+MP_INLINE apr_size_t modperl_input_filter_read(pTHX_
+ modperl_filter_t *filter,
+ ap_input_mode_t mode,
+ apr_read_type_e block,
+ apr_off_t readbytes,
+ SV *buffer,
+ apr_size_t wanted)
+{
+ apr_size_t len = 0;
+
+ if (!filter->bb_in) {
+ /* This should be read only once per handler invocation! */
+ filter->bb_in = apr_brigade_create(filter->pool,
+ filter->f->c->bucket_alloc);
+ ap_get_brigade(filter->f->next, filter->bb_in, mode, block, readbytes);
+ MP_TRACE_f(MP_FUNC, "retrieving bb: 0x%lx\n",
+ (unsigned long)(filter->bb_in));
+ }
+
+ len = modperl_filter_read(aTHX_ filter, buffer, wanted);
+
+/* if (APR_BRIGADE_EMPTY(filter->bb_in)) { */
+/* apr_brigade_destroy(filter->bb_in); */
+/* filter->bb_in = NULL; */
+/* } */
+
+ if (filter->flush && len == 0) {
+ /* if len > 0 then $filter->write will flush */
+ modperl_input_filter_flush(filter);
}
-#endif
- if ((filter->eos || filter->flush) && (len == 0)) {
+ return len;
+}
+
+
+MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
+ modperl_filter_t *filter,
+ SV *buffer,
+ apr_size_t wanted)
+{
+ apr_size_t len = 0;
+ len = modperl_filter_read(aTHX_ filter, buffer, wanted);
+
+ if (filter->flush && len == 0) {
/* if len > 0 then $filter->write will flush */
modperl_output_filter_flush(filter);
}
@@ -354,8 +458,33 @@
return len;
}
+
+MP_INLINE apr_status_t modperl_input_filter_flush(modperl_filter_t *filter)
+{
+ if (((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos) {
+ /* no data should be sent after EOS has been sent */
+ return filter->rc;
+ }
+
+ if (filter->eos || filter->flush) {
+ MP_TRACE_f(MP_FUNC, "sending %s bucket\n",
+ filter->eos ? "EOS" : "FLUSH");
+ filter->rc = filter->eos ?
+ send_input_eos(filter) : send_input_flush(filter);
+ /* modperl_brigade_dump(filter->bb_out, stderr); */
+ filter->flush = filter->eos = 0;
+ }
+
+ return filter->rc;
+}
+
MP_INLINE apr_status_t modperl_output_filter_flush(modperl_filter_t *filter)
{
+ if (((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos) {
+ /* no data should be sent after EOS has been sent */
+ return filter->rc;
+ }
+
filter->rc = modperl_wbucket_flush(&filter->wbucket);
if (filter->rc != APR_SUCCESS) {
return filter->rc;
@@ -365,10 +494,10 @@
MP_TRACE_f(MP_FUNC, "sending %s bucket\n",
filter->eos ? "EOS" : "FLUSH");
filter->rc = filter->eos ?
- send_eos(filter->f) : send_flush(filter->f);
- if (filter->bb) {
- apr_brigade_destroy(filter->bb);
- filter->bb = NULL;
+ send_output_eos(filter->f) : send_output_flush(filter->f);
+ if (filter->bb_in) {
+ apr_brigade_destroy(filter->bb_in);
+ filter->bb_in = NULL;
}
filter->flush = filter->eos = 0;
}
@@ -376,6 +505,20 @@
return filter->rc;
}
+MP_INLINE apr_status_t modperl_input_filter_write(modperl_filter_t *filter,
+ const char *buf,
+ apr_size_t *len)
+{
+ apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
+ char *copy = apr_pstrndup(filter->pool, buf, *len);
+ apr_bucket *bucket = apr_bucket_transient_create(copy, *len, ba);
+ /* MP_TRACE_f(MP_FUNC, "writing %d bytes: %s\n", *len, copy); */
+ MP_TRACE_f(MP_FUNC, "writing %d bytes:\n", *len);
+ APR_BRIGADE_INSERT_TAIL(filter->bb_out, bucket);
+ /* modperl_brigade_dump(filter->bb_out, stderr); */
+ return APR_SUCCESS;
+}
+
MP_INLINE apr_status_t modperl_output_filter_write(modperl_filter_t *filter,
const char *buf,
apr_size_t *len)
@@ -389,9 +532,16 @@
modperl_filter_t *filter;
int status;
- filter = modperl_filter_new(f, bb, MP_OUTPUT_FILTER_MODE);
- status = modperl_run_filter(filter, 0, 0, 0);
-
+ if (((modperl_filter_ctx_t *)f->ctx)->sent_eos) {
+ MP_TRACE_f(MP_FUNC,
+ "EOS was already sent, passing through the brigade\n");
+ return ap_pass_brigade(f->next, bb);
+ }
+ else {
+ filter = modperl_filter_new(f, bb, MP_OUTPUT_FILTER_MODE);
+ status = modperl_run_filter(filter, 0, 0, 0);
+ }
+
switch (status) {
case OK:
return APR_SUCCESS;
@@ -411,9 +561,16 @@
modperl_filter_t *filter;
int status;
- filter = modperl_filter_new(f, bb, MP_INPUT_FILTER_MODE);
- status = modperl_run_filter(filter, mode, block, readbytes);
-
+ if (((modperl_filter_ctx_t *)f->ctx)->sent_eos) {
+ MP_TRACE_f(MP_FUNC,
+ "EOS was already sent, passing through the brigade\n");
+ return ap_get_brigade(f->next, bb, mode, block, readbytes);
+ }
+ else {
+ filter = modperl_filter_new(f, bb, MP_INPUT_FILTER_MODE);
+ status = modperl_run_filter(filter, mode, block, readbytes);
+ }
+
switch (status) {
case OK:
case DECLINED:
@@ -445,7 +602,7 @@
if (!(handlers[i]->attrs & MP_FILTER_CONNECTION_HANDLER)) {
MP_TRACE_f(MP_FUNC,
- "%s is not an FilterConnection handler\n",
+ "%s is not a FilterConnection handler\n",
handlers[i]->name);
continue;
}
@@ -585,6 +742,8 @@
(unsigned long)bucket,
(long)bucket->length,
(unsigned long)bucket->data);
+ /* fprintf(fp, " : %s\n", (char *)bucket->data); */
+
i++;
}
#endif
1.16 +15 -0 modperl-2.0/src/modules/perl/modperl_filter.h
Index: modperl_filter.h
===================================================================
RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_filter.h,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- modperl_filter.h 12 Jan 2003 02:15:58 -0000 1.15
+++ modperl_filter.h 15 Jan 2003 06:07:10 -0000 1.16
@@ -42,6 +42,8 @@
void modperl_output_filter_add_request(request_rec *r);
MP_INLINE apr_status_t modperl_output_filter_flush(modperl_filter_t *filter);
+MP_INLINE apr_status_t modperl_input_filter_flush(modperl_filter_t *filter);
+
MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
modperl_filter_t *filter,
@@ -64,5 +66,18 @@
void modperl_input_filter_add_connection(conn_rec *c);
void modperl_input_filter_add_request(request_rec *r);
+
+MP_INLINE apr_size_t modperl_input_filter_read(pTHX_
+ modperl_filter_t *filter,
+ ap_input_mode_t mode,
+ apr_read_type_e block,
+ apr_off_t readbytes,
+ SV *buffer,
+ apr_size_t wanted);
+
+MP_INLINE apr_status_t modperl_input_filter_write(modperl_filter_t *filter,
+ const char *buf,
+ apr_size_t *len);
+
#endif /* MODPERL_FILTER_H */
1.64 +3 -1 modperl-2.0/src/modules/perl/modperl_types.h
Index: modperl_types.h
===================================================================
RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_types.h,v
retrieving revision 1.63
retrieving revision 1.64
diff -u -r1.63 -r1.64
--- modperl_types.h 25 Nov 2002 22:46:29 -0000 1.63
+++ modperl_types.h 15 Jan 2003 06:07:10 -0000 1.64
@@ -192,13 +192,15 @@
apr_ssize_t remaining;
modperl_wbucket_t wbucket;
apr_bucket *bucket;
- apr_bucket_brigade *bb;
+ apr_bucket_brigade *bb_in;
+ apr_bucket_brigade *bb_out;
apr_status_t rc;
modperl_filter_mode_e mode;
apr_pool_t *pool;
} modperl_filter_t;
typedef struct {
+ int sent_eos;
SV *data;
modperl_handler_t *handler;
PerlInterpreter *perl;
1.94 +71 -0 modperl-2.0/xs/tables/current/ModPerl/FunctionTable.pm
Index: FunctionTable.pm
===================================================================
RCS file: /home/cvs/modperl-2.0/xs/tables/current/ModPerl/FunctionTable.pm,v
retrieving revision 1.93
retrieving revision 1.94
diff -u -r1.93 -r1.94
--- FunctionTable.pm 12 Jan 2003 02:33:28 -0000 1.93
+++ FunctionTable.pm 15 Jan 2003 06:07:10 -0000 1.94
@@ -3138,6 +3138,19 @@
},
{
'return_type' => 'apr_status_t',
+ 'name' => 'modperl_input_filter_flush',
+ 'attr' => [
+ '__inline__'
+ ],
+ 'args' => [
+ {
+ 'type' => 'modperl_filter_t *',
+ 'name' => 'filter'
+ }
+ ]
+ },
+ {
+ 'return_type' => 'apr_status_t',
'name' => 'modperl_output_filter_handler',
'args' => [
{
@@ -3176,6 +3189,43 @@
]
},
{
+ 'return_type' => 'apr_size_t',
+ 'name' => 'modperl_input_filter_read',
+ 'attr' => [
+ '__inline__'
+ ],
+ 'args' => [
+ {
+ 'type' => 'PerlInterpreter *',
+ 'name' => 'my_perl'
+ },
+ {
+ 'type' => 'modperl_filter_t *',
+ 'name' => 'filter'
+ },
+ {
+ 'type' => 'ap_input_mode_t',
+ 'name' => 'mode'
+ },
+ {
+ 'type' => 'apr_read_type_e',
+ 'name' => 'block'
+ },
+ {
+ 'type' => 'apr_off_t',
+ 'name' => 'readbytes'
+ },
+ {
+ 'type' => 'SV *',
+ 'name' => 'buffer'
+ },
+ {
+ 'type' => 'apr_size_t',
+ 'name' => 'wanted'
+ }
+ ]
+ },
+ {
'return_type' => 'void',
'name' => 'modperl_output_filter_add_connection',
'args' => [
@@ -3198,6 +3248,27 @@
{
'return_type' => 'apr_status_t',
'name' => 'modperl_output_filter_write',
+ 'attr' => [
+ '__inline__'
+ ],
+ 'args' => [
+ {
+ 'type' => 'modperl_filter_t *',
+ 'name' => 'filter'
+ },
+ {
+ 'type' => 'const char *',
+ 'name' => 'buf'
+ },
+ {
+ 'type' => 'apr_size_t *',
+ 'name' => 'len'
+ }
+ ]
+ },
+ {
+ 'return_type' => 'apr_status_t',
+ 'name' => 'modperl_input_filter_write',
'attr' => [
'__inline__'
],