costin      02/01/25 23:03:42

  Modified:    jk/native2/common jk_ajp14_worker.c
  Log:
  That's one of the biggest changes.
  
  First, the endpoint management is now more explicit and direct - since this
  worker is sending the request, it needs endpoint and it recycles it.
  
  Second, we treat jni as a particular case - with a different way to handle it.
  What's nice is that now almost all of the code is shared and common - we do get
  reused endpoints for jni and all the good stuff from ajp ( including efficient
  c2b and less GC ).
  
  Revision  Changes    Path
  1.14      +173 -125  jakarta-tomcat-connectors/jk/native2/common/jk_ajp14_worker.c
  
  Index: jk_ajp14_worker.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_ajp14_worker.c,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- jk_ajp14_worker.c 12 Jan 2002 05:05:12 -0000      1.13
  +++ jk_ajp14_worker.c 26 Jan 2002 07:03:42 -0000      1.14
  @@ -72,35 +72,7 @@
   #include "jk_service.h"
   #include "jk_env.h"
   #include "jk_objCache.h"
  -#include "jk_ajp14.h"
  -
  -int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool, void 
**result,
  -                                       const char *type, const char *name);
  -
  -static int JK_METHOD
  -jk_worker_ajp14_service(jk_env_t *env, jk_endpoint_t   *e,
  -                        jk_ws_service_t *s,
  -                        int *is_recoverable_error);
  -
  -static int JK_METHOD
  -jk_worker_ajp14_validate(jk_env_t *env, jk_worker_t *_this,
  -                         jk_map_t    *props,
  -                         jk_workerEnv_t *we );
  -
  -static int JK_METHOD
  -jk_worker_ajp14_done(jk_env_t *env, jk_endpoint_t *e);
  -
  -static int JK_METHOD
  -jk_worker_ajp14_getEndpoint(jk_env_t *env, jk_worker_t *_this,
  -                            jk_endpoint_t **e);
  -
  -static int JK_METHOD
  -jk_worker_ajp14_init(jk_env_t *env, jk_worker_t *_this,
  -                     jk_map_t    *props, 
  -                     jk_workerEnv_t *we);
  -
  -static int JK_METHOD
  -jk_worker_ajp14_destroy(jk_env_t *env, jk_worker_t *_this);
  +#include "jk_registry.h"
   
   
   #define AJP_DEF_RETRY_ATTEMPTS    (2)
  @@ -114,37 +86,6 @@
   
   /* -------------------- Impl -------------------- */
   
  -int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool,
  -                                       void **result,
  -                                       const char *type, const char *name)
  -{
  -    jk_worker_t *w=(jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
  -
  -    if (name == NULL || w == NULL) {
  -        env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                      "ajp14.factory() NullPointerException\n");
  -        return JK_FALSE;
  -    }
  -    w->pool = pool;
  -    w->name = NULL;
  -    
  -    w->proto= AJP14_PROTO;
  -
  -    w->endpointCache= NULL;
  -    w->connect_retry_attempts= AJP_DEF_RETRY_ATTEMPTS;
  -
  -    w->channel= NULL;
  -    w->secret= NULL;
  -   
  -    w->validate= jk_worker_ajp14_validate;
  -    w->init= jk_worker_ajp14_init;
  -    w->get_endpoint= jk_worker_ajp14_getEndpoint;
  -    w->destroy=jk_worker_ajp14_destroy;
  -
  -    *result = w;
  -
  -    return JK_TRUE;
  -}
   
   /*
    * Initialize the worker.
  @@ -199,13 +140,11 @@
        }
       }
       
  -    _this->channel->setProperty( env, _this->channel, "defaultPort", "8007" );
  -
       err=_this->channel->init( env, _this->channel, props, p->name, _this);
   
       if( err != JK_TRUE ) {
        env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                      "ajp14.validate(): resolve failed\n");
  +                      "ajp14.validate(): channel init failed\n");
        return err;
       }
   
  @@ -260,7 +199,7 @@
   
       jk_serialize_ping( env, msg, ae );
       
  -    err = msg->send( env, msg, ae );
  +    err = ae->worker->channel->send( env, ae->worker->channel, ae, msg );
   
       /* Move to 'slave' mode, listening to messages */
       err=ae->worker->workerEnv->processCallbacks( env, ae->worker->workerEnv,
  @@ -274,56 +213,17 @@
       return err;
   }
   
  -
  -/*
  - * Serve the request, using AJP13/AJP14
  - */
  -static int JK_METHOD
  -jk_worker_ajp14_service(jk_env_t *env, jk_endpoint_t   *e, 
  -                        jk_ws_service_t *s,
  -                        int  *is_recoverable_error)
  +/** First message in a stream-based connection. If the first send
  +    fails, try to reconnect.
  +*/
  +static int JK_METHOD
  +jk_worker_ajp14_sendAndReconnect(jk_env_t *env, jk_worker_t *worker,
  +                              jk_ws_service_t *s,
  +                              jk_endpoint_t   *e )
   {
  -    int err;
       int attempt;
  -    int hasPost=JK_FALSE;
  -
  -    if( ( e== NULL ) 
  -     || ( s == NULL )
  -        || ! is_recoverable_error ) {
  -     env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                      "ajp14.service() NullPointerException\n");
  -     return JK_FALSE;
  -    }
  -
  -    /* Prepare the messages we'll use.*/ 
  -    e->request->reset( env, e->request );
  -    e->reply->reset( env, e->reply );
  -    e->post->reset( env, e->post );
  +    int err;
       
  -    e->recoverable = JK_TRUE;
  -    e->uploadfd       = -1;          /* not yet used, later ;) */
  -    e->reuse = JK_FALSE;
  -    *is_recoverable_error = JK_TRUE;
  -    /* Up to now, we can recover */
  -    e->recoverable = JK_TRUE;
  -
  -    s->left_bytes_to_send = s->content_length;
  -    s->content_read=0;
  -
  -    /* 
  -     * We get here initial request (in reqmsg)
  -     */
  -    err=jk_serialize_request13(env, e->request, s);
  -    if (err!=JK_TRUE) {
  -     *is_recoverable_error = JK_FALSE;                
  -     env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                      "ajp14.service(): error marshaling\n");
  -     return JK_FALSE;
  -    }
  -
  -    env->l->jkLog(env, env->l, JK_LOG_INFO,
  -                  "ajp14.service() %s\n", e->worker->name);
  -
       /*
        * Try to send the request on a valid endpoint. If one endpoint
        * fails, close the channel and try again ( maybe tomcat was restarted )
  @@ -331,11 +231,12 @@
        * XXX JK_RETRIES could be replaced by the number of workers in
        * a load-balancing configuration 
        */
  -    for(attempt = 0 ; attempt < e->worker->connect_retry_attempts ;attempt++) {
  -        jk_channel_t *channel=e->worker->channel;
  +    for(attempt = 0 ; attempt < worker->connect_retry_attempts ;attempt++) {
  +        jk_channel_t *channel= worker->channel;
   
           /* e->request->dump(env, e->request, "Before sending "); */
  -        err=e->request->send( env, e->request, e);
  +        err=e->worker->channel->send( env, e->worker->channel, e,
  +                                      e->request );
   
        if (err==JK_TRUE ) {
               /* We sent the request, have valid endpoint */
  @@ -345,12 +246,12 @@
           if( e->recoverable != JK_TRUE ) {
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                        "ajp14.service() error sending request %s, giving up\n",
  -                     e->worker->name);
  +                     worker->name);
               return JK_FALSE;
           }
           
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
  -                 "ajp14.service() error sending, retry on a new endpoint %s\n",
  +                "ajp14.service() error sending, retry on a new endpoint %s\n",
                         e->worker->name);
   
           channel->close( env, channel, e );
  @@ -370,9 +271,23 @@
            */
           e->recoverable = JK_FALSE;
       }
  +    return JK_TRUE;
  +}
  +
  +
  +static int JK_METHOD
  +jk_worker_ajp14_forwardStream(jk_env_t *env, jk_worker_t *worker,
  +                              jk_ws_service_t *s,
  +                              jk_endpoint_t   *e )
  +{
  +    int err;
  +
  +    err=jk_worker_ajp14_sendAndReconnect( env, worker, s, e );
  +    if( err!=JK_TRUE )
  +        return err;
       
       /* We should have a channel now, send the post data */
  -    *is_recoverable_error = JK_TRUE;
  +    s->is_recoverable_error = JK_TRUE;
       e->recoverable = JK_TRUE;
   
       /* Prepare to send some post data ( ajp13 proto ). We do that after the
  @@ -390,12 +305,13 @@
           if (err != JK_TRUE ) {
               /* the browser stop sending data, no need to recover */
               e->recoverable = JK_FALSE;
  +            s->is_recoverable_error = JK_FALSE;
               env->l->jkLog(env, env->l, JK_LOG_ERROR,
                             "ajp14.service() Error receiving initial post \n");
               return JK_FALSE;
           }
  -
  -        err= e->post->send( env, e->post, e );
  +        err= e->worker->channel->send( env, e->worker->channel, e,
  +                                       e->post );
       }
   
       err = e->worker->workerEnv->processCallbacks(env, e->worker->workerEnv,
  @@ -406,7 +322,7 @@
        * upload data and we must consider that operation is no more recoverable
        */
       if (! e->recoverable) {
  -        *is_recoverable_error = JK_FALSE;
  +        s->is_recoverable_error = JK_FALSE;
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "ajp14.service() ajpGetReply unrecoverable error %d\n",
                         err);
  @@ -418,13 +334,95 @@
                         "ajp14.service() ajpGetReply recoverable error %d\n",
                         err);
       }
  +    return err;
  +}
  +
  +static int JK_METHOD
  +jk_worker_ajp14_forwardSingleThread(jk_env_t *env, jk_worker_t *worker,
  +                                    jk_ws_service_t *s,
  +                                    jk_endpoint_t   *e )
  +{
  +    int err;
  +
  +    env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                  "ajp14.forwardST() Before calling native channel\n");
  +    err=e->worker->channel->send( env, e->worker->channel, e,
  +                                  e->request );
  +        env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                  "ajp14.forwardST() After %d\n",err);
  +    
  +    
  +    return JK_TRUE;
  +}
  +     
  +static int JK_METHOD
  +jk_worker_ajp14_service1(jk_env_t *env, jk_worker_t *w,
  +                        jk_ws_service_t *s,
  +                        jk_endpoint_t   *e )
  +{
  +    int err;
  +
  +    if( ( e== NULL ) || ( s == NULL ) ) {
  +     env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                      "ajp14.service() NullPointerException\n");
  +     return JK_FALSE;
  +    }
  +
  +    e->currentRequest=s;
  +    
  +    /* Prepare the messages we'll use.*/ 
  +    e->request->reset( env, e->request );
  +    e->reply->reset( env, e->reply );
  +    e->post->reset( env, e->post );
  +    
  +    e->uploadfd       = -1;          /* not yet used, later ;) */
  +    e->reuse = JK_FALSE;
  +    s->is_recoverable_error = JK_TRUE;
  +    /* Up to now, we can recover */
  +    e->recoverable = JK_TRUE;
  +
  +    s->left_bytes_to_send = s->content_length;
  +    s->content_read=0;
  +
  +    /* 
  +     * We get here initial request (in reqmsg)
  +     */
  +    err=jk_serialize_request13(env, e->request, s);
  +    if (err!=JK_TRUE) {
  +     s->is_recoverable_error = JK_FALSE;                
  +     env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                      "ajp14.service(): error marshaling\n");
  +     return JK_FALSE;
  +    }
  +
  +    env->l->jkLog(env, env->l, JK_LOG_INFO,
  +                  "ajp14.service() %s\n", e->worker->name);
  +
  +    if( w->channel->beforeRequest != NULL ) {
  +        w->channel->beforeRequest( env, w->channel, w, e, s );
  +    }
  +    
  +    /* First message for this request */
  +    if( w->channel->is_stream == JK_TRUE ) {
  +        err=jk_worker_ajp14_forwardStream( env, w, s, e );
  +    } else {
  +        err=jk_worker_ajp14_forwardSingleThread( env, w, s, e );
  +    }
  +
  +    if( w->channel->afterRequest != NULL ) {
  +        w->channel->afterRequest( env, w->channel, w, e, s );
  +    }
  +
  +    e->currentRequest=NULL;
       
       return err;
   }
   
   
  +
  +
   static int JK_METHOD
  -jk_worker_ajp14_done(jk_env_t *env, jk_endpoint_t *e)
  +jk_worker_ajp14_done(jk_env_t *env, jk_worker_t *we, jk_endpoint_t *e)
   {
       jk_worker_t *w;
       
  @@ -449,7 +447,7 @@
       env->l->jkLog(env, env->l, JK_LOG_INFO,
                     "ajp14.done() close endpoint %s\n",
                     w->name );
  -
  +    
       return JK_TRUE;
   }
   
  @@ -501,13 +499,31 @@
       e->worker = _this;
       e->proto = _this->proto;
       e->channelData = NULL;
  -    e->service = jk_worker_ajp14_service;
  -    e->done = jk_worker_ajp14_done;
       
       *eP = e;
       return JK_TRUE;
   }
   
  +/*
  + * Serve the request, using AJP13/AJP14
  + */
  +static int JK_METHOD
  +jk_worker_ajp14_service(jk_env_t *env, jk_worker_t *w,
  +                        jk_ws_service_t *s)
  +{
  +    int err;
  +    jk_endpoint_t   *e;
  +
  +    /* Get endpoint from the pool */
  +    jk_worker_ajp14_getEndpoint( env, w, &e );
  +
  +    err=jk_worker_ajp14_service1( env, w, s, e );
  +
  +    jk_worker_ajp14_done( env, w, e);
  +    return err;
  +}
  +
  +
   static int JK_METHOD
   jk_worker_ajp14_init(jk_env_t *env, jk_worker_t *_this,
                        jk_map_t    *props, 
  @@ -597,7 +613,7 @@
   {
       int i;
       
  -    env->l->jkLog(env, env->l, JK_LOG_DEBUG,
  +    env->l->jkLog(env, env->l, JK_LOG_INFO,
                     "ajp14.destroy()\n");
   
       if( _this->endpointCache != NULL ) {
  @@ -622,6 +638,38 @@
       }
   
       _this->pool->close( env, _this->pool );
  +
  +    return JK_TRUE;
  +}
  +
  +int JK_METHOD jk_worker_ajp14_factory( jk_env_t *env, jk_pool_t *pool,
  +                                       void **result,
  +                                       const char *type, const char *name)
  +{
  +    jk_worker_t *w=(jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
  +
  +    if (name == NULL || w == NULL) {
  +        env->l->jkLog(env, env->l, JK_LOG_ERROR,
  +                      "ajp14.factory() NullPointerException\n");
  +        return JK_FALSE;
  +    }
  +    w->pool = pool;
  +    w->name = NULL;
  +    
  +    w->proto= AJP14_PROTO;
  +
  +    w->endpointCache= NULL;
  +    w->connect_retry_attempts= AJP_DEF_RETRY_ATTEMPTS;
  +
  +    w->channel= NULL;
  +    w->secret= NULL;
  +   
  +    w->validate= jk_worker_ajp14_validate;
  +    w->init= jk_worker_ajp14_init;
  +    w->destroy=jk_worker_ajp14_destroy;
  +    w->service = jk_worker_ajp14_service;
  +
  +    *result = w;
   
       return JK_TRUE;
   }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to