stas        2003/01/14 22:07:11

  Modified:    src/modules/perl modperl_filter.c modperl_filter.h
                        modperl_types.h
               xs/tables/current/ModPerl FunctionTable.pm
  Log:
  - implementation of the input stream filtering support (1st phase)
  - code refactoring to be re-use for input and output filtering
  - proper support for mis-behaved feeding filters that send more than one
  EOS bucket
  
  Revision  Changes    Path
  1.43      +225 -66   modperl-2.0/src/modules/perl/modperl_filter.c
  
  Index: modperl_filter.c
  ===================================================================
  RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_filter.c,v
  retrieving revision 1.42
  retrieving revision 1.43
  diff -u -r1.42 -r1.43
  --- modperl_filter.c  12 Jan 2003 02:21:37 -0000      1.42
  +++ modperl_filter.c  15 Jan 2003 06:07:10 -0000      1.43
  @@ -94,15 +94,23 @@
   
       filter->mode = mode;
       filter->f = f;
  -    filter->bb = bb;
       filter->pool = p;
       filter->wbucket.pool = p;
       filter->wbucket.filters = &f->next;
       filter->wbucket.outcnt = 0;
   
  +    if (mode == MP_INPUT_FILTER_MODE) {
  +        filter->bb_in  = NULL;
  +        filter->bb_out = bb;
  +    }
  +    else {
  +        filter->bb_in  = bb;
  +        filter->bb_out = NULL;
  +    }
  +    
       MP_TRACE_f(MP_FUNC, "filter=0x%lx, mode=%s\n",
  -               (unsigned long)filter, mode == MP_OUTPUT_FILTER_MODE ?
  -               "output" : "input");
  +               (unsigned long)filter,
  +               mode == MP_INPUT_FILTER_MODE ? "input" : "output");
   
       return filter;
   }
  @@ -138,7 +146,10 @@
   
       modperl_handler_make_args(aTHX_ &args,
                                 "Apache::Filter", filter->f,
  -                              "APR::Brigade", filter->bb,
  +                              "APR::Brigade",
  +                              (filter->mode == MP_INPUT_FILTER_MODE
  +                               ? filter->bb_out
  +                               : filter->bb_in),
                                 NULL);
   
       modperl_filter_mg_set(aTHX_ AvARRAY(args)[0], filter);
  @@ -168,26 +179,59 @@
           filter->seen_eos = 0;
       }
   
  -    if (filter->mode == MP_OUTPUT_FILTER_MODE) {
  +    if (filter->mode == MP_INPUT_FILTER_MODE) {
  +        if (filter->bb_in) {
  +            /* in the streaming mode filter->bb_in is populated on the
  +             * first modperl_input_filter_read, so it must be
  +             * destroyed at the end of the filter invocation
  +             */
  +            /* XXX: may be the filter must consume all the data? add a
  +             * test to check */
  +            apr_brigade_destroy(filter->bb_in);
  +            filter->bb_in = NULL;
  +        }
  +        modperl_input_filter_flush(filter);
  +    }
  +    else {
           modperl_output_filter_flush(filter);
       }
  +    
   
       return status;
   }
   
   /* output filters */
   
  -MP_INLINE static apr_status_t send_eos(ap_filter_t *f)
  +MP_INLINE static apr_status_t send_input_eos(modperl_filter_t *filter)
  +{
  +    apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
  +    apr_bucket *b = apr_bucket_eos_create(ba);
  +    APR_BRIGADE_INSERT_TAIL(filter->bb_out, b);
  +    ((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos = 1;
  +    return APR_SUCCESS;
  +    
  +}
  +
  +MP_INLINE static apr_status_t send_input_flush(modperl_filter_t *filter)
  +{
  +    apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
  +    apr_bucket *b = apr_bucket_flush_create(ba);
  +    APR_BRIGADE_INSERT_TAIL(filter->bb_out, b);
  +    return APR_SUCCESS;
  +}
  +
  +MP_INLINE static apr_status_t send_output_eos(ap_filter_t *f)
   {
       apr_bucket_alloc_t *ba = f->c->bucket_alloc;
       apr_bucket_brigade *bb = apr_brigade_create(MP_FILTER_POOL(f),
                                                   ba);
       apr_bucket *b = apr_bucket_eos_create(ba);
       APR_BRIGADE_INSERT_TAIL(bb, b);
  +    ((modperl_filter_ctx_t *)f->ctx)->sent_eos = 1;
       return ap_pass_brigade(f->next, bb);
   }
   
  -MP_INLINE static apr_status_t send_flush(ap_filter_t *f)
  +MP_INLINE static apr_status_t send_output_flush(ap_filter_t *f)
   {
       apr_bucket_alloc_t *ba = f->c->bucket_alloc;
       apr_bucket_brigade *bb = apr_brigade_create(MP_FILTER_POOL(f),
  @@ -199,11 +243,14 @@
   
   /* unrolled APR_BRIGADE_FOREACH loop */
   
  +#define MP_FILTER_EMPTY(filter) \
  +APR_BRIGADE_EMPTY(filter->bb_in)
  +
   #define MP_FILTER_SENTINEL(filter) \
  -APR_BRIGADE_SENTINEL(filter->bb)
  +APR_BRIGADE_SENTINEL(filter->bb_in)
   
   #define MP_FILTER_FIRST(filter) \
  -APR_BRIGADE_FIRST(filter->bb)
  +APR_BRIGADE_FIRST(filter->bb_in)
   
   #define MP_FILTER_NEXT(filter) \
   APR_BUCKET_NEXT(filter->bucket)
  @@ -216,52 +263,83 @@
   
   MP_INLINE static int get_bucket(modperl_filter_t *filter)
   {
  -    if (!filter->bb) {
  +    if (!filter->bb_in || MP_FILTER_EMPTY(filter)) {
  +        MP_TRACE_f(MP_FUNC, "%s filter bucket brigade is empty\n",
  +               (filter->mode == MP_INPUT_FILTER_MODE ? "input" : "output"));
           return 0;
       }
  +    
       if (!filter->bucket) {
           filter->bucket = MP_FILTER_FIRST(filter);
  -        return 1;
  -    }
  -    else if (MP_FILTER_IS_EOS(filter)) {
  -        MP_TRACE_f(MP_FUNC, "received EOS bucket\n");
  -        filter->seen_eos = 1;
  -        return 1;
       }
       else if (filter->bucket != MP_FILTER_SENTINEL(filter)) {
           filter->bucket = MP_FILTER_NEXT(filter);
  -        if (filter->bucket == MP_FILTER_SENTINEL(filter)) {
  -            apr_brigade_destroy(filter->bb);
  -            filter->bb = NULL;
  -            return 0;
  -        }
  -        else {
  -            return 1;
  -        }
       }
   
  -    return 0;
  +    if (filter->bucket == MP_FILTER_SENTINEL(filter)) {
  +        filter->bucket = NULL;
  +        /* can't destroy bb_in since the next read will need a brigade
  +         * to try to read from */
  +        apr_brigade_cleanup(filter->bb_in);
  +        return 0;
  +    }
  +    
  +    if (MP_FILTER_IS_EOS(filter)) {
  +        MP_TRACE_f(MP_FUNC, "%s filter received EOS bucket\n",
  +                   (filter->mode == MP_INPUT_FILTER_MODE
  +                    ? "input" : "output"));
  +
  +        filter->seen_eos = 1;
  +        /* there should be only one EOS sent, modperl_filter_read will
  +         * not come here, since filter->seen_eos is set
  +         */
  +        return 0;
  +    }
  +    else if (MP_FILTER_IS_FLUSH(filter)) {
  +        MP_TRACE_f(MP_FUNC, "%s filter received FLUSH bucket\n",
  +                   (filter->mode == MP_INPUT_FILTER_MODE
  +                    ? "input" : "output"));
  +
  +        filter->flush = 1;
  +        return 0;
  +    }
  +    else {
  +        return 1;
  +    }
   }
   
  -MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
  +
  +MP_INLINE static apr_size_t modperl_filter_read(pTHX_
                                                   modperl_filter_t *filter,
                                                   SV *buffer,
                                                   apr_size_t wanted)
   {
       int num_buckets = 0;
       apr_size_t len = 0;
  -
  +    
       (void)SvUPGRADE(buffer, SVt_PV);
       SvPOK_only(buffer);
       SvCUR(buffer) = 0;
   
  -    /*modperl_brigade_dump(filter->bb);*/
  +    /* sometimes the EOS bucket arrives in the same brigade with other
  +     * buckets, so that particular read() will not return 0 and will
  +     * be called again if called in the while ($filter->read(...))
  +     * loop. In that case we return 0.
  +     */
  +    if (filter->seen_eos) {
  +        return 0;
  +    }
  +    
  +    /*modperl_brigade_dump(filter->bb_in, stderr);*/
   
  -    MP_TRACE_f(MP_FUNC, "caller wants %d bytes\n", wanted);
  +    MP_TRACE_f(MP_FUNC, "%s filter wants %d bytes\n",
  +               (filter->mode == MP_INPUT_FILTER_MODE ? "input" : "output"),
  +               wanted);
   
       if (filter->remaining) {
           if (filter->remaining >= wanted) {
  -            MP_TRACE_f(MP_FUNC, "eating %d of remaining %d leftover bytes\n",
  +            MP_TRACE_f(MP_FUNC,
  +                       "eating/returning %d of remaining %d leftover bytes\n",
                          wanted, filter->remaining);
               sv_catpvn(buffer, filter->leftover, wanted);
               filter->leftover += wanted;
  @@ -278,11 +356,6 @@
           }
       }
   
  -    if (!filter->bb) {
  -        MP_TRACE_f(MP_FUNC, "bucket brigade has been emptied\n");
  -        return 0;
  -    }
  -
       while (1) {
           const char *buf;
           apr_size_t buf_len;
  @@ -291,17 +364,6 @@
               break;
           }
   
  -        if (MP_FILTER_IS_EOS(filter)) {
  -            MP_TRACE_f(MP_FUNC, "received EOS bucket\n");
  -            filter->seen_eos = 1;
  -            break;
  -        }
  -        else if (MP_FILTER_IS_FLUSH(filter)) {
  -            MP_TRACE_f(MP_FUNC, "received FLUSH bucket\n");
  -            filter->flush = 1;
  -            break;
  -        }
  -
           num_buckets++;
   
           filter->rc = apr_bucket_read(filter->bucket, &buf, &buf_len, 0);
  @@ -336,17 +398,59 @@
           }
       }
   
  -#ifdef MP_TRACE
  -    if (num_buckets) {
  -        MP_TRACE_f(MP_FUNC,
  -                   "returning %d bytes from %d bucket%s "
  -                   "(%d bytes leftover)\n",
  -                   len, num_buckets, ((num_buckets > 1) ? "s" : ""),
  -                   filter->remaining);
  +    MP_TRACE_f(MP_FUNC,
  +               "returning %d bytes from %d bucket%s "
  +               "(%d bytes leftover)\n",
  +               len, num_buckets, ((num_buckets == 1) ? "" : "s"),
  +               filter->remaining);
  +
  +    return len;
  +}
  +
  +MP_INLINE apr_size_t modperl_input_filter_read(pTHX_
  +                                               modperl_filter_t *filter,
  +                                               ap_input_mode_t mode,
  +                                               apr_read_type_e block,
  +                                               apr_off_t readbytes,
  +                                               SV *buffer,
  +                                               apr_size_t wanted)
  +{
  +    apr_size_t len = 0;
  +
  +    if (!filter->bb_in) {
  +        /* This should be read only once per handler invocation! */
  +        filter->bb_in = apr_brigade_create(filter->pool,
  +                                           filter->f->c->bucket_alloc);
  +        ap_get_brigade(filter->f->next, filter->bb_in, mode, block, readbytes);
  +        MP_TRACE_f(MP_FUNC, "retrieving bb: 0x%lx\n",
  +                   (unsigned long)(filter->bb_in));
  +    }
  +
  +    len = modperl_filter_read(aTHX_ filter, buffer, wanted);
  +
  +/*     if (APR_BRIGADE_EMPTY(filter->bb_in)) { */
  +/*         apr_brigade_destroy(filter->bb_in); */
  +/*         filter->bb_in = NULL; */
  +/*     } */
  +
  +    if (filter->flush && len == 0) {
  +        /* if len > 0 then $filter->write will flush */
  +        modperl_input_filter_flush(filter);
       }
  -#endif
   
  -    if ((filter->eos || filter->flush) && (len == 0)) {
  +    return len;
  +}
  +
  +
  +MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
  +                                                modperl_filter_t *filter,
  +                                                SV *buffer,
  +                                                apr_size_t wanted)
  +{
  +    apr_size_t len = 0;
  +    len = modperl_filter_read(aTHX_ filter, buffer, wanted);
  +    
  +    if (filter->flush && len == 0) {
           /* if len > 0 then $filter->write will flush */
           modperl_output_filter_flush(filter);
       }
  @@ -354,8 +458,33 @@
       return len;
   }
   
  +
  +MP_INLINE apr_status_t modperl_input_filter_flush(modperl_filter_t *filter)
  +{
  +    if (((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos) {
  +        /* no data should be sent after EOS has been sent */
  +        return filter->rc;
  +    }
  +    
  +    if (filter->eos || filter->flush) {
  +        MP_TRACE_f(MP_FUNC, "sending %s bucket\n",
  +                   filter->eos ? "EOS" : "FLUSH");
  +        filter->rc = filter->eos ?
  +            send_input_eos(filter) : send_input_flush(filter);
  +        /* modperl_brigade_dump(filter->bb_out, stderr); */
  +        filter->flush = filter->eos = 0;
  +    }
  +    
  +    return filter->rc;
  +}
  +
   MP_INLINE apr_status_t modperl_output_filter_flush(modperl_filter_t *filter)
   {
  +    if (((modperl_filter_ctx_t *)filter->f->ctx)->sent_eos) {
  +        /* no data should be sent after EOS has been sent */
  +        return filter->rc;
  +    }
  +
       filter->rc = modperl_wbucket_flush(&filter->wbucket);
       if (filter->rc != APR_SUCCESS) {
           return filter->rc;
  @@ -365,10 +494,10 @@
           MP_TRACE_f(MP_FUNC, "sending %s bucket\n",
                      filter->eos ? "EOS" : "FLUSH");
           filter->rc = filter->eos ?
  -            send_eos(filter->f) : send_flush(filter->f);
  -        if (filter->bb) {
  -            apr_brigade_destroy(filter->bb);
  -            filter->bb = NULL;
  +            send_output_eos(filter->f) : send_output_flush(filter->f);
  +        if (filter->bb_in) {
  +            apr_brigade_destroy(filter->bb_in);
  +            filter->bb_in = NULL;
           }
           filter->flush = filter->eos = 0;
       }
  @@ -376,6 +505,20 @@
       return filter->rc;
   }
   
  +MP_INLINE apr_status_t modperl_input_filter_write(modperl_filter_t *filter,
  +                                                  const char *buf,
  +                                                  apr_size_t *len)
  +{
  +    apr_bucket_alloc_t *ba = filter->f->c->bucket_alloc;
  +    char *copy = apr_pstrndup(filter->pool, buf, *len);
  +    apr_bucket *bucket = apr_bucket_transient_create(copy, *len, ba);
  +    /* MP_TRACE_f(MP_FUNC, "writing %d bytes: %s\n", *len, copy); */
  +    MP_TRACE_f(MP_FUNC, "writing %d bytes:\n", *len);
  +    APR_BRIGADE_INSERT_TAIL(filter->bb_out, bucket);
  +    /* modperl_brigade_dump(filter->bb_out, stderr); */
  +    return APR_SUCCESS;
  +}
  +
   MP_INLINE apr_status_t modperl_output_filter_write(modperl_filter_t *filter,
                                                      const char *buf,
                                                      apr_size_t *len)
  @@ -389,9 +532,16 @@
       modperl_filter_t *filter;
       int status;
   
  -    filter = modperl_filter_new(f, bb, MP_OUTPUT_FILTER_MODE);
  -    status = modperl_run_filter(filter, 0, 0, 0);
  -
  +    if (((modperl_filter_ctx_t *)f->ctx)->sent_eos) {
  +        MP_TRACE_f(MP_FUNC,
  +                   "EOS was already sent, passing through the brigade\n");
  +        return ap_pass_brigade(f->next, bb);
  +    }
  +    else {
  +        filter = modperl_filter_new(f, bb, MP_OUTPUT_FILTER_MODE);
  +        status = modperl_run_filter(filter, 0, 0, 0);
  +    }
  +    
       switch (status) {
         case OK:
           return APR_SUCCESS;
  @@ -411,9 +561,16 @@
       modperl_filter_t *filter;
       int status;
   
  -    filter = modperl_filter_new(f, bb, MP_INPUT_FILTER_MODE);
  -    status = modperl_run_filter(filter, mode, block, readbytes);
  -
  +    if (((modperl_filter_ctx_t *)f->ctx)->sent_eos) {
  +        MP_TRACE_f(MP_FUNC,
  +                   "EOS was already sent, passing through the brigade\n");
  +        return ap_get_brigade(f->next, bb, mode, block, readbytes);
  +    }
  +    else {
  +        filter = modperl_filter_new(f, bb, MP_INPUT_FILTER_MODE);
  +        status = modperl_run_filter(filter, mode, block, readbytes);
  +    }
  +    
       switch (status) {
         case OK:
         case DECLINED:
  @@ -445,7 +602,7 @@
   
               if (!(handlers[i]->attrs & MP_FILTER_CONNECTION_HANDLER)) {
                   MP_TRACE_f(MP_FUNC,
  -                           "%s is not an FilterConnection handler\n",
  +                           "%s is not a FilterConnection handler\n",
                              handlers[i]->name);
                   continue;
               }
  @@ -585,6 +742,8 @@
                   (unsigned long)bucket,
                   (long)bucket->length,
                   (unsigned long)bucket->data);
  +        /* fprintf(fp, "       : %s\n", (char *)bucket->data); */
  +        
           i++;
       }
   #endif
  
  
  
  1.16      +15 -0     modperl-2.0/src/modules/perl/modperl_filter.h
  
  Index: modperl_filter.h
  ===================================================================
  RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_filter.h,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- modperl_filter.h  12 Jan 2003 02:15:58 -0000      1.15
  +++ modperl_filter.h  15 Jan 2003 06:07:10 -0000      1.16
  @@ -42,6 +42,8 @@
   void modperl_output_filter_add_request(request_rec *r);
   
   MP_INLINE apr_status_t modperl_output_filter_flush(modperl_filter_t *filter);
  +MP_INLINE apr_status_t modperl_input_filter_flush(modperl_filter_t *filter);
  +
   
   MP_INLINE apr_size_t modperl_output_filter_read(pTHX_
                                                   modperl_filter_t *filter,
  @@ -64,5 +66,18 @@
   void modperl_input_filter_add_connection(conn_rec *c);
   
   void modperl_input_filter_add_request(request_rec *r);
  +
  +MP_INLINE apr_size_t modperl_input_filter_read(pTHX_
  +                                               modperl_filter_t *filter,
  +                                               ap_input_mode_t mode,
  +                                               apr_read_type_e block,
  +                                               apr_off_t readbytes,
  +                                               SV *buffer,
  +                                               apr_size_t wanted);
  +    
  +MP_INLINE apr_status_t modperl_input_filter_write(modperl_filter_t *filter,
  +                                                  const char *buf,
  +                                                  apr_size_t *len);
  +
   
   #endif /* MODPERL_FILTER_H */
  
  
  
  1.64      +3 -1      modperl-2.0/src/modules/perl/modperl_types.h
  
  Index: modperl_types.h
  ===================================================================
  RCS file: /home/cvs/modperl-2.0/src/modules/perl/modperl_types.h,v
  retrieving revision 1.63
  retrieving revision 1.64
  diff -u -r1.63 -r1.64
  --- modperl_types.h   25 Nov 2002 22:46:29 -0000      1.63
  +++ modperl_types.h   15 Jan 2003 06:07:10 -0000      1.64
  @@ -192,13 +192,15 @@
       apr_ssize_t remaining;
       modperl_wbucket_t wbucket;
       apr_bucket *bucket;
  -    apr_bucket_brigade *bb;
  +    apr_bucket_brigade *bb_in;
  +    apr_bucket_brigade *bb_out;
       apr_status_t rc;
       modperl_filter_mode_e mode;
       apr_pool_t *pool;
   } modperl_filter_t;
   
   typedef struct {
  +    int sent_eos;
       SV *data;
       modperl_handler_t *handler;
       PerlInterpreter *perl;
  
  
  
  1.94      +71 -0     modperl-2.0/xs/tables/current/ModPerl/FunctionTable.pm
  
  Index: FunctionTable.pm
  ===================================================================
  RCS file: /home/cvs/modperl-2.0/xs/tables/current/ModPerl/FunctionTable.pm,v
  retrieving revision 1.93
  retrieving revision 1.94
  diff -u -r1.93 -r1.94
  --- FunctionTable.pm  12 Jan 2003 02:33:28 -0000      1.93
  +++ FunctionTable.pm  15 Jan 2003 06:07:10 -0000      1.94
  @@ -3138,6 +3138,19 @@
     },
     {
       'return_type' => 'apr_status_t',
  +    'name' => 'modperl_input_filter_flush',
  +    'attr' => [
  +      '__inline__'
  +    ],
  +    'args' => [
  +      {
  +        'type' => 'modperl_filter_t *',
  +        'name' => 'filter'
  +      }
  +    ]
  +  },
  +  {
  +    'return_type' => 'apr_status_t',
       'name' => 'modperl_output_filter_handler',
       'args' => [
         {
  @@ -3176,6 +3189,43 @@
       ]
     },
     {
  +    'return_type' => 'apr_size_t',
  +    'name' => 'modperl_input_filter_read',
  +    'attr' => [
  +      '__inline__'
  +    ],
  +    'args' => [
  +      {
  +        'type' => 'PerlInterpreter *',
  +        'name' => 'my_perl'
  +      },
  +      {
  +        'type' => 'modperl_filter_t *',
  +        'name' => 'filter'
  +      },
  +      {
  +        'type' => 'ap_input_mode_t',
  +        'name' => 'mode'
  +      },
  +      {
  +        'type' => 'apr_read_type_e',
  +        'name' => 'block'
  +      },
  +      {
  +        'type' => 'apr_off_t',
  +        'name' => 'readbytes'
  +      },
  +      {
  +        'type' => 'SV *',
  +        'name' => 'buffer'
  +      },
  +      {
  +        'type' => 'apr_size_t',
  +        'name' => 'wanted'
  +      }
  +    ]
  +  },
  +  {
       'return_type' => 'void',
       'name' => 'modperl_output_filter_add_connection',
       'args' => [
  @@ -3198,6 +3248,27 @@
     {
       'return_type' => 'apr_status_t',
       'name' => 'modperl_output_filter_write',
  +    'attr' => [
  +      '__inline__'
  +    ],
  +    'args' => [
  +      {
  +        'type' => 'modperl_filter_t *',
  +        'name' => 'filter'
  +      },
  +      {
  +        'type' => 'const char *',
  +        'name' => 'buf'
  +      },
  +      {
  +        'type' => 'apr_size_t *',
  +        'name' => 'len'
  +      }
  +    ]
  +  },
  +  {
  +    'return_type' => 'apr_status_t',
  +    'name' => 'modperl_input_filter_write',
       'attr' => [
         '__inline__'
       ],
  
  
  


Reply via email to